Compare commits

...

9 Commits

Author SHA1 Message Date
Anton Popov
e08a293412
Merge cef9eb80d9 into 7fd2207626 2024-09-18 15:41:24 +02:00
Anton Popov
cef9eb80d9 better checkpoints for ColumnString 2024-09-03 22:47:59 +00:00
Anton Popov
ae1a8393b0 add test for ColumnObject 2024-09-03 22:16:22 +00:00
Anton Popov
457b05aa9a Merge remote-tracking branch 'upstream/master' into HEAD 2024-09-03 21:09:05 +00:00
Anton Popov
4cd8272186 fix rollback of ColumnDynamic 2024-09-03 18:21:53 +00:00
Anton Popov
d932d0ae4f fix performance of parsing row formats 2024-09-02 14:16:00 +00:00
Anton Popov
b7fccd8617 Merge remote-tracking branch 'upstream/master' into HEAD 2024-08-30 12:43:22 +00:00
Anton Popov
eb0ae55e02 better rollbacks of columns 2024-08-28 21:36:46 +00:00
Anton Popov
c39d7092d0 better rollbacks of columns 2024-08-27 16:47:54 +00:00
34 changed files with 626 additions and 57 deletions

View File

@ -369,6 +369,23 @@ void ColumnArray::popBack(size_t n)
offsets_data.resize_assume_reserved(offsets_data.size() - n);
}
ColumnCheckpointPtr ColumnArray::getCheckpoint() const
{
return std::make_shared<ColumnCheckpointWithNested>(size(), getData().getCheckpoint());
}
void ColumnArray::updateCheckpoint(ColumnCheckpoint & checkpoint) const
{
checkpoint.size = size();
getData().updateCheckpoint(*assert_cast<ColumnCheckpointWithNested &>(checkpoint).nested);
}
void ColumnArray::rollback(const ColumnCheckpoint & checkpoint)
{
getOffsets().resize_assume_reserved(checkpoint.size);
getData().rollback(*assert_cast<const ColumnCheckpointWithNested &>(checkpoint).nested);
}
int ColumnArray::compareAtImpl(size_t n, size_t m, const IColumn & rhs_, int nan_direction_hint, const Collator * collator) const
{
const ColumnArray & rhs = assert_cast<const ColumnArray &>(rhs_);

View File

@ -161,6 +161,10 @@ public:
ColumnPtr compress() const override;
ColumnCheckpointPtr getCheckpoint() const override;
void updateCheckpoint(ColumnCheckpoint & checkpoint) const override;
void rollback(const ColumnCheckpoint & checkpoint) override;
void forEachSubcolumn(MutableColumnCallback callback) override
{
callback(offsets);

View File

@ -979,6 +979,92 @@ ColumnPtr ColumnDynamic::compress() const
});
}
void ColumnDynamic::updateCheckpoint(ColumnCheckpoint & checkpoint) const
{
auto & nested = assert_cast<ColumnCheckpointWithMultipleNested &>(checkpoint).nested;
const auto & variants = variant_column_ptr->getVariants();
size_t old_size = nested.size();
chassert(old_size <= variants.size());
for (size_t i = 0; i < old_size; ++i)
{
variants[i]->updateCheckpoint(*nested[i]);
}
/// If column has new variants since last checkpoint create checkpoints for them.
if (old_size < variants.size())
{
nested.resize(variants.size());
for (size_t i = old_size; i < variants.size(); ++i)
nested[i] = variants[i]->getCheckpoint();
}
checkpoint.size = size();
}
DataTypePtr ColumnDynamic::popBackVariants(const VariantInfo & info, const std::vector<ColumnVariant::Discriminator> & local_to_global_discriminators, size_t n)
{
const auto & type_variant = assert_cast<const DataTypeVariant &>(*info.variant_type);
std::unordered_map<ColumnVariant::Discriminator, String> discriminator_to_name;
std::unordered_map<String, DataTypePtr> name_to_data_type;
for (const auto & [name, discriminator] : info.variant_name_to_discriminator)
discriminator_to_name.emplace(discriminator, name);
for (const auto & type : type_variant.getVariants())
name_to_data_type.emplace(type->getName(), type);
/// Remove last n variants according to global discriminators.
/// This code relies on invariant that new variants are always added to the end in ColumnVariant.
for (auto it = local_to_global_discriminators.rbegin(); it < local_to_global_discriminators.rbegin() + n; ++it)
discriminator_to_name.erase(*it);
DataTypes new_variants;
for (const auto & [d, name] : discriminator_to_name)
new_variants.push_back(name_to_data_type.at(name));
return std::make_shared<DataTypeVariant>(std::move(new_variants));
}
void ColumnDynamic::rollback(const ColumnCheckpoint & checkpoint)
{
const auto & nested = assert_cast<const ColumnCheckpointWithMultipleNested &>(checkpoint).nested;
chassert(nested.size() <= variant_column_ptr->getNumVariants());
/// The structure hasn't changed, so we can use generic rollback of Variant column
if (nested.size() == variant_column_ptr->getNumVariants())
{
variant_column_ptr->rollback(checkpoint);
return;
}
auto new_subcolumns = variant_column_ptr->getVariants();
auto new_discriminators_map = variant_column_ptr->getLocalToGlobalDiscriminatorsMapping();
auto new_discriminators_column = variant_column_ptr->getLocalDiscriminatorsPtr();
auto new_offses_column = variant_column_ptr->getOffsetsPtr();
/// Remove new variants that were added since last checkpoint.
auto new_variant_type = popBackVariants(variant_info, new_discriminators_map, variant_column_ptr->getNumVariants() - nested.size());
createVariantInfo(new_variant_type);
variant_mappings_cache.clear();
new_subcolumns.resize(nested.size());
new_discriminators_map.resize(nested.size());
/// Manually rollback internals of Variant column
new_discriminators_column->assumeMutable()->popBack(new_discriminators_column->size() - checkpoint.size);
new_offses_column->assumeMutable()->popBack(new_offses_column->size() - checkpoint.size);
for (size_t i = 0; i < nested.size(); ++i)
new_subcolumns[i]->rollback(*nested[i]);
variant_column = ColumnVariant::create(new_discriminators_column, new_offses_column, Columns(new_subcolumns.begin(), new_subcolumns.end()), new_discriminators_map);
variant_column_ptr = assert_cast<ColumnVariant *>(variant_column.get());
}
String ColumnDynamic::getTypeNameAt(size_t row_num) const
{
const auto & variant_col = getVariantColumn();

View File

@ -304,6 +304,15 @@ public:
variant_column_ptr->protect();
}
ColumnCheckpointPtr getCheckpoint() const override
{
return variant_column_ptr->getCheckpoint();
}
void updateCheckpoint(ColumnCheckpoint & checkpoint) const override;
void rollback(const ColumnCheckpoint & checkpoint) override;
void forEachSubcolumn(MutableColumnCallback callback) override
{
callback(variant_column);
@ -444,6 +453,8 @@ private:
void updateVariantInfoAndExpandVariantColumn(const DataTypePtr & new_variant_type);
static DataTypePtr popBackVariants(const VariantInfo & info, const std::vector<ColumnVariant::Discriminator> & local_to_global_discriminators, size_t n);
WrappedPtr variant_column;
/// Store and use pointer to ColumnVariant to avoid virtual calls.
/// ColumnDynamic is widely used inside ColumnObject for each path and

View File

@ -312,6 +312,21 @@ void ColumnMap::getExtremes(Field & min, Field & max) const
max = std::move(map_max_value);
}
ColumnCheckpointPtr ColumnMap::getCheckpoint() const
{
return nested->getCheckpoint();
}
void ColumnMap::updateCheckpoint(ColumnCheckpoint & checkpoint) const
{
nested->updateCheckpoint(checkpoint);
}
void ColumnMap::rollback(const ColumnCheckpoint & checkpoint)
{
nested->rollback(checkpoint);
}
void ColumnMap::forEachSubcolumn(MutableColumnCallback callback)
{
callback(nested);

View File

@ -102,6 +102,9 @@ public:
size_t byteSizeAt(size_t n) const override;
size_t allocatedBytes() const override;
void protect() override;
ColumnCheckpointPtr getCheckpoint() const override;
void updateCheckpoint(ColumnCheckpoint & checkpoint) const override;
void rollback(const ColumnCheckpoint & checkpoint) override;
void forEachSubcolumn(MutableColumnCallback callback) override;
void forEachSubcolumnRecursively(RecursiveMutableColumnCallback callback) override;
bool structureEquals(const IColumn & rhs) const override;

View File

@ -305,6 +305,23 @@ void ColumnNullable::popBack(size_t n)
getNullMapColumn().popBack(n);
}
ColumnCheckpointPtr ColumnNullable::getCheckpoint() const
{
return std::make_shared<ColumnCheckpointWithNested>(size(), nested_column->getCheckpoint());
}
void ColumnNullable::updateCheckpoint(ColumnCheckpoint & checkpoint) const
{
checkpoint.size = size();
nested_column->updateCheckpoint(*assert_cast<ColumnCheckpointWithNested &>(checkpoint).nested);
}
void ColumnNullable::rollback(const ColumnCheckpoint & checkpoint)
{
getNullMapData().resize_assume_reserved(checkpoint.size);
nested_column->rollback(*assert_cast<const ColumnCheckpointWithNested &>(checkpoint).nested);
}
ColumnPtr ColumnNullable::filter(const Filter & filt, ssize_t result_size_hint) const
{
ColumnPtr filtered_data = getNestedColumn().filter(filt, result_size_hint);

View File

@ -143,6 +143,10 @@ public:
ColumnPtr compress() const override;
ColumnCheckpointPtr getCheckpoint() const override;
void updateCheckpoint(ColumnCheckpoint & checkpoint) const override;
void rollback(const ColumnCheckpoint & checkpoint) override;
void forEachSubcolumn(MutableColumnCallback callback) override
{
callback(nested_column);

View File

@ -30,6 +30,23 @@ const std::shared_ptr<SerializationDynamic> & getDynamicSerialization()
return dynamic_serialization;
}
struct ColumnObjectCheckpoint : public ColumnCheckpoint
{
using CheckpointsMap = std::unordered_map<String, ColumnCheckpointPtr>;
ColumnObjectCheckpoint(size_t size_, CheckpointsMap typed_paths_, CheckpointsMap dynamic_paths_, ColumnCheckpointPtr shared_data_)
: ColumnCheckpoint(size_)
, typed_paths(std::move(typed_paths_))
, dynamic_paths(std::move(dynamic_paths_))
, shared_data(std::move(shared_data_))
{
}
CheckpointsMap typed_paths;
CheckpointsMap dynamic_paths;
ColumnCheckpointPtr shared_data;
};
}
ColumnObject::ColumnObject(
@ -655,6 +672,69 @@ void ColumnObject::popBack(size_t n)
shared_data->popBack(n);
}
ColumnCheckpointPtr ColumnObject::getCheckpoint() const
{
auto get_checkpoints = [](const auto & columns)
{
std::unordered_map<String, ColumnCheckpointPtr> checkpoints;
for (const auto & [name, column] : columns)
checkpoints[name] = column->getCheckpoint();
return checkpoints;
};
return std::make_shared<ColumnObjectCheckpoint>(size(), get_checkpoints(typed_paths), get_checkpoints(dynamic_paths_ptrs), shared_data->getCheckpoint());
}
void ColumnObject::updateCheckpoint(ColumnCheckpoint & checkpoint) const
{
auto & object_checkpoint = assert_cast<ColumnObjectCheckpoint &>(checkpoint);
auto update_checkpoints = [&](const auto & columns_map, auto & checkpoints_map)
{
for (const auto & [name, column] : columns_map)
{
auto & nested = checkpoints_map[name];
if (!nested)
nested = column->getCheckpoint();
else
column->updateCheckpoint(*nested);
}
};
checkpoint.size = size();
update_checkpoints(typed_paths, object_checkpoint.typed_paths);
update_checkpoints(dynamic_paths, object_checkpoint.dynamic_paths);
shared_data->updateCheckpoint(*object_checkpoint.shared_data);
}
void ColumnObject::rollback(const ColumnCheckpoint & checkpoint)
{
const auto & object_checkpoint = assert_cast<const ColumnObjectCheckpoint &>(checkpoint);
auto rollback_columns = [&](auto & columns_map, const auto & checkpoints_map)
{
NameSet names_to_remove;
/// Rollback subcolumns and remove paths that were not in checkpoint.
for (auto & [name, column] : columns_map)
{
auto it = checkpoints_map.find(name);
if (it == checkpoints_map.end())
names_to_remove.insert(name);
else
column->rollback(*it->second);
}
for (const auto & name : names_to_remove)
columns_map.erase(name);
};
rollback_columns(typed_paths, object_checkpoint.typed_paths);
rollback_columns(dynamic_paths, object_checkpoint.dynamic_paths);
shared_data->rollback(*object_checkpoint.shared_data);
}
StringRef ColumnObject::serializeValueIntoArena(size_t n, Arena & arena, const char *& begin) const
{
StringRef res(begin, 0);

View File

@ -159,6 +159,9 @@ public:
size_t byteSizeAt(size_t n) const override;
size_t allocatedBytes() const override;
void protect() override;
ColumnCheckpointPtr getCheckpoint() const override;
void updateCheckpoint(ColumnCheckpoint & checkpoint) const override;
void rollback(const ColumnCheckpoint & checkpoint) override;
void forEachSubcolumn(MutableColumnCallback callback) override;

View File

@ -308,6 +308,28 @@ void ColumnSparse::popBack(size_t n)
_size = new_size;
}
ColumnCheckpointPtr ColumnSparse::getCheckpoint() const
{
return std::make_shared<ColumnCheckpointWithNested>(size(), values->getCheckpoint());
}
void ColumnSparse::updateCheckpoint(ColumnCheckpoint & checkpoint) const
{
checkpoint.size = size();
values->updateCheckpoint(*assert_cast<ColumnCheckpointWithNested &>(checkpoint).nested);
}
void ColumnSparse::rollback(const ColumnCheckpoint & checkpoint)
{
_size = checkpoint.size;
const auto & nested = *assert_cast<const ColumnCheckpointWithNested &>(checkpoint).nested;
chassert(nested.size > 0);
values->rollback(nested);
getOffsetsData().resize_assume_reserved(nested.size - 1);
}
ColumnPtr ColumnSparse::filter(const Filter & filt, ssize_t) const
{
if (_size != filt.size())

View File

@ -149,6 +149,10 @@ public:
ColumnPtr compress() const override;
ColumnCheckpointPtr getCheckpoint() const override;
void updateCheckpoint(ColumnCheckpoint & checkpoint) const override;
void rollback(const ColumnCheckpoint & checkpoint) override;
void forEachSubcolumn(MutableColumnCallback callback) override;
void forEachSubcolumnRecursively(RecursiveMutableColumnCallback callback) override;

View File

@ -240,6 +240,23 @@ ColumnPtr ColumnString::permute(const Permutation & perm, size_t limit) const
return permuteImpl(*this, perm, limit);
}
ColumnCheckpointPtr ColumnString::getCheckpoint() const
{
auto nested = std::make_shared<ColumnCheckpoint>(chars.size());
return std::make_shared<ColumnCheckpointWithNested>(size(), std::move(nested));
}
void ColumnString::updateCheckpoint(ColumnCheckpoint & checkpoint) const
{
checkpoint.size = size();
assert_cast<ColumnCheckpointWithNested &>(checkpoint).nested->size = chars.size();
}
void ColumnString::rollback(const ColumnCheckpoint & checkpoint)
{
offsets.resize_assume_reserved(checkpoint.size);
chars.resize_assume_reserved(assert_cast<const ColumnCheckpointWithNested &>(checkpoint).nested->size);
}
void ColumnString::collectSerializedValueSizes(PaddedPODArray<UInt64> & sizes, const UInt8 * is_null) const
{

View File

@ -194,6 +194,10 @@ public:
offsets.resize_assume_reserved(offsets.size() - n);
}
ColumnCheckpointPtr getCheckpoint() const override;
void updateCheckpoint(ColumnCheckpoint & checkpoint) const override;
void rollback(const ColumnCheckpoint & checkpoint) override;
void collectSerializedValueSizes(PaddedPODArray<UInt64> & sizes, const UInt8 * is_null) const override;
StringRef serializeValueIntoArena(size_t n, Arena & arena, char const *& begin) const override;

View File

@ -254,6 +254,37 @@ void ColumnTuple::popBack(size_t n)
column->popBack(n);
}
ColumnCheckpointPtr ColumnTuple::getCheckpoint() const
{
ColumnCheckpoints checkpoints;
checkpoints.reserve(columns.size());
for (const auto & column : columns)
checkpoints.push_back(column->getCheckpoint());
return std::make_shared<ColumnCheckpointWithMultipleNested>(size(), std::move(checkpoints));
}
void ColumnTuple::updateCheckpoint(ColumnCheckpoint & checkpoint) const
{
auto & checkpoints = assert_cast<ColumnCheckpointWithMultipleNested &>(checkpoint).nested;
chassert(checkpoints.size() == columns.size());
checkpoint.size = size();
for (size_t i = 0; i < columns.size(); ++i)
columns[i]->updateCheckpoint(*checkpoints[i]);
}
void ColumnTuple::rollback(const ColumnCheckpoint & checkpoint)
{
column_length = checkpoint.size;
const auto & checkpoints = assert_cast<const ColumnCheckpointWithMultipleNested &>(checkpoint).nested;
chassert(columns.size() == checkpoints.size());
for (size_t i = 0; i < columns.size(); ++i)
columns[i]->rollback(*checkpoints[i]);
}
StringRef ColumnTuple::serializeValueIntoArena(size_t n, Arena & arena, char const *& begin) const
{
if (columns.empty())

View File

@ -118,6 +118,9 @@ public:
size_t byteSizeAt(size_t n) const override;
size_t allocatedBytes() const override;
void protect() override;
ColumnCheckpointPtr getCheckpoint() const override;
void updateCheckpoint(ColumnCheckpoint & checkpoint) const override;
void rollback(const ColumnCheckpoint & checkpoint) override;
void forEachSubcolumn(MutableColumnCallback callback) override;
void forEachSubcolumnRecursively(RecursiveMutableColumnCallback callback) override;
bool structureEquals(const IColumn & rhs) const override;

View File

@ -739,6 +739,39 @@ void ColumnVariant::popBack(size_t n)
offsets->popBack(n);
}
ColumnCheckpointPtr ColumnVariant::getCheckpoint() const
{
ColumnCheckpoints checkpoints;
checkpoints.reserve(variants.size());
for (const auto & column : variants)
checkpoints.push_back(column->getCheckpoint());
return std::make_shared<ColumnCheckpointWithMultipleNested>(size(), std::move(checkpoints));
}
void ColumnVariant::updateCheckpoint(ColumnCheckpoint & checkpoint) const
{
auto & checkpoints = assert_cast<ColumnCheckpointWithMultipleNested &>(checkpoint).nested;
chassert(checkpoints.size() == variants.size());
checkpoint.size = size();
for (size_t i = 0; i < variants.size(); ++i)
variants[i]->updateCheckpoint(*checkpoints[i]);
}
void ColumnVariant::rollback(const ColumnCheckpoint & checkpoint)
{
getOffsets().resize_assume_reserved(checkpoint.size);
getLocalDiscriminators().resize_assume_reserved(checkpoint.size);
const auto & checkpoints = assert_cast<const ColumnCheckpointWithMultipleNested &>(checkpoint).nested;
chassert(variants.size() == checkpoints.size());
for (size_t i = 0; i < variants.size(); ++i)
variants[i]->rollback(*checkpoints[i]);
}
StringRef ColumnVariant::serializeValueIntoArena(size_t n, Arena & arena, char const *& begin) const
{
/// During any serialization/deserialization we should always use global discriminators.

View File

@ -248,6 +248,9 @@ public:
size_t byteSizeAt(size_t n) const override;
size_t allocatedBytes() const override;
void protect() override;
ColumnCheckpointPtr getCheckpoint() const override;
void updateCheckpoint(ColumnCheckpoint & checkpoint) const override;
void rollback(const ColumnCheckpoint & checkpoint) override;
void forEachSubcolumn(MutableColumnCallback callback) override;
void forEachSubcolumnRecursively(RecursiveMutableColumnCallback callback) override;
bool structureEquals(const IColumn & rhs) const override;

View File

@ -49,6 +49,40 @@ struct EqualRange
using EqualRanges = std::vector<EqualRange>;
/// A checkpoint that contains size of column and all its subcolumns.
/// It can be used to rollback column to the previous state, for example
/// after failed parsing when column may be in inconsistent state.
struct ColumnCheckpoint
{
size_t size;
explicit ColumnCheckpoint(size_t size_) : size(size_) {}
virtual ~ColumnCheckpoint() = default;
};
using ColumnCheckpointPtr = std::shared_ptr<ColumnCheckpoint>;
using ColumnCheckpoints = std::vector<ColumnCheckpointPtr>;
struct ColumnCheckpointWithNested : public ColumnCheckpoint
{
ColumnCheckpointWithNested(size_t size_, ColumnCheckpointPtr nested_)
: ColumnCheckpoint(size_), nested(std::move(nested_))
{
}
ColumnCheckpointPtr nested;
};
struct ColumnCheckpointWithMultipleNested : public ColumnCheckpoint
{
ColumnCheckpointWithMultipleNested(size_t size_, ColumnCheckpoints nested_)
: ColumnCheckpoint(size_), nested(std::move(nested_))
{
}
ColumnCheckpoints nested;
};
/// Declares interface to store columns in memory.
class IColumn : public COW<IColumn>
{
@ -509,6 +543,17 @@ public:
/// The operation is slow and performed only for debug builds.
virtual void protect() {}
/// Returns checkpoint of current state of column.
virtual ColumnCheckpointPtr getCheckpoint() const { return std::make_shared<ColumnCheckpoint>(size()); }
/// Updates the checkpoint with current state. It is used to avoid extra allocations in 'getCheckpoint'.
virtual void updateCheckpoint(ColumnCheckpoint & checkpoint) const { checkpoint.size = size(); }
/// Rollbacks column to the checkpoint.
/// Unlike 'popBack' this method should work correctly even if column has invalid state.
/// Sizes of columns in checkpoint must be less or equal than current size.
virtual void rollback(const ColumnCheckpoint & checkpoint) { popBack(size() - checkpoint.size); }
/// If the column contains subcolumns (such as Array, Nullable, etc), do callback on them.
/// Shallow: doesn't do recursive calls; don't do call for itself.

View File

@ -920,3 +920,72 @@ TEST(ColumnDynamic, compare)
ASSERT_EQ(column_from->compareAt(3, 2, *column_from, -1), -1);
ASSERT_EQ(column_from->compareAt(3, 4, *column_from, -1), -1);
}
TEST(ColumnDynamic, rollback)
{
auto check_variant = [](const ColumnVariant & column_variant, std::vector<size_t> sizes)
{
ASSERT_EQ(column_variant.getNumVariants(), sizes.size());
size_t num_rows = 0;
for (size_t i = 0; i < sizes.size(); ++i)
{
ASSERT_EQ(column_variant.getVariants()[i]->size(), sizes[i]);
num_rows += sizes[i];
}
ASSERT_EQ(num_rows, column_variant.size());
};
auto check_checkpoint = [&](const ColumnCheckpoint & cp, std::vector<size_t> sizes)
{
const auto & nested = assert_cast<const ColumnCheckpointWithMultipleNested &>(cp).nested;
ASSERT_EQ(nested.size(), sizes.size());
size_t num_rows = 0;
for (size_t i = 0; i < sizes.size(); ++i)
{
ASSERT_EQ(nested[i]->size, sizes[i]);
num_rows += sizes[i];
}
ASSERT_EQ(num_rows, cp.size);
};
std::vector<std::pair<ColumnCheckpointPtr, std::vector<size_t>>> checkpoints;
auto column = ColumnDynamic::create(2);
auto checkpoint = column->getCheckpoint();
column->insert(Field(42));
column->updateCheckpoint(*checkpoint);
checkpoints.emplace_back(checkpoint, std::vector<size_t>{0, 1});
column->insert(Field("str1"));
column->rollback(*checkpoint);
check_checkpoint(*checkpoint, checkpoints.back().second);
check_variant(column->getVariantColumn(), checkpoints.back().second);
column->insert("str1");
checkpoints.emplace_back(column->getCheckpoint(), std::vector<size_t>{0, 1, 1});
column->insert("str2");
checkpoints.emplace_back(column->getCheckpoint(), std::vector<size_t>{0, 1, 2});
column->insert(Array({1, 2}));
checkpoints.emplace_back(column->getCheckpoint(), std::vector<size_t>{1, 1, 2});
column->insert(Field(42.42));
checkpoints.emplace_back(column->getCheckpoint(), std::vector<size_t>{2, 1, 2});
for (const auto & [cp, sizes] : checkpoints)
{
auto column_copy = column->clone();
column_copy->rollback(*cp);
check_checkpoint(*cp, sizes);
check_variant(assert_cast<const ColumnDynamic &>(*column_copy).getVariantColumn(), sizes);
}
}

View File

@ -5,6 +5,7 @@
#include <IO/WriteBufferFromString.h>
#include <Common/Arena.h>
#include "Core/Field.h"
#include <gtest/gtest.h>
using namespace DB;
@ -349,3 +350,65 @@ TEST(ColumnObject, SkipSerializedInArena)
pos = col2->skipSerializedInArena(pos);
ASSERT_EQ(pos, end);
}
TEST(ColumnObject, rollback)
{
auto type = DataTypeFactory::instance().get("JSON(max_dynamic_types=10, max_dynamic_paths=2, a.a UInt32, a.b UInt32)");
auto col = type->createColumn();
auto & col_object = assert_cast<ColumnObject &>(*col);
const auto & typed_paths = col_object.getTypedPaths();
const auto & dynamic_paths = col_object.getDynamicPaths();
const auto & shared_data = col_object.getSharedDataColumn();
auto assert_sizes = [&](size_t size)
{
for (const auto & [name, column] : typed_paths)
ASSERT_EQ(column->size(), size);
for (const auto & [name, column] : dynamic_paths)
ASSERT_EQ(column->size(), size);
ASSERT_EQ(shared_data.size(), size);
};
auto checkpoint = col_object.getCheckpoint();
col_object.insert(Object{{"a.a", Field{1u}}});
col_object.updateCheckpoint(*checkpoint);
col_object.insert(Object{{"a.b", Field{2u}}});
col_object.insert(Object{{"a.a", Field{3u}}});
col_object.rollback(*checkpoint);
assert_sizes(1);
ASSERT_EQ(typed_paths.size(), 2);
ASSERT_EQ(dynamic_paths.size(), 0);
ASSERT_EQ((*typed_paths.at("a.a"))[0], Field{1u});
ASSERT_EQ((*typed_paths.at("a.b"))[0], Field{0u});
col_object.insert(Object{{"a.c", Field{"ccc"}}});
checkpoint = col_object.getCheckpoint();
col_object.insert(Object{{"a.d", Field{"ddd"}}});
col_object.insert(Object{{"a.e", Field{"eee"}}});
assert_sizes(4);
ASSERT_EQ(typed_paths.size(), 2);
ASSERT_EQ(dynamic_paths.size(), 2);
ASSERT_EQ((*typed_paths.at("a.a"))[0], Field{1u});
ASSERT_EQ((*dynamic_paths.at("a.c"))[1], Field{"ccc"});
ASSERT_EQ((*dynamic_paths.at("a.d"))[2], Field{"ddd"});
col_object.rollback(*checkpoint);
assert_sizes(2);
ASSERT_EQ(typed_paths.size(), 2);
ASSERT_EQ(dynamic_paths.size(), 1);
ASSERT_EQ((*typed_paths.at("a.a"))[0], Field{1u});
ASSERT_EQ((*dynamic_paths.at("a.c"))[1], Field{"ccc"});
}

View File

@ -1023,15 +1023,14 @@ Chunk AsynchronousInsertQueue::processEntriesWithParsing(
adding_defaults_transform = std::make_shared<AddingDefaultsTransform>(header, columns, *format, insert_context);
}
auto on_error = [&](const MutableColumns & result_columns, Exception & e)
auto on_error = [&](const MutableColumns & result_columns, const ColumnCheckpoints & checkpoints, Exception & e)
{
current_exception = e.displayText();
LOG_ERROR(logger, "Failed parsing for query '{}' with query id {}. {}",
key.query_str, current_entry->query_id, current_exception);
for (const auto & column : result_columns)
if (column->size() > total_rows)
column->popBack(column->size() - total_rows);
for (size_t i = 0; i < result_columns.size(); ++i)
result_columns[i]->rollback(*checkpoints[i]);
current_entry->finish(std::current_exception());
return 0;

View File

@ -22,8 +22,12 @@ StreamingFormatExecutor::StreamingFormatExecutor(
, adding_defaults_transform(std::move(adding_defaults_transform_))
, port(format->getPort().getHeader(), format.get())
, result_columns(header.cloneEmptyColumns())
, checkpoints(result_columns.size())
{
connect(format->getPort(), port);
for (size_t i = 0; i < result_columns.size(); ++i)
checkpoints[i] = result_columns[i]->getCheckpoint();
}
MutableColumns StreamingFormatExecutor::getResultColumns()
@ -53,6 +57,9 @@ size_t StreamingFormatExecutor::execute(ReadBuffer & buffer)
size_t StreamingFormatExecutor::execute()
{
for (size_t i = 0; i < result_columns.size(); ++i)
result_columns[i]->updateCheckpoint(*checkpoints[i]);
try
{
size_t new_rows = 0;
@ -85,19 +92,19 @@ size_t StreamingFormatExecutor::execute()
catch (Exception & e)
{
format->resetParser();
return on_error(result_columns, e);
return on_error(result_columns, checkpoints, e);
}
catch (std::exception & e)
{
format->resetParser();
auto exception = Exception(Exception::CreateFromSTDTag{}, e);
return on_error(result_columns, exception);
return on_error(result_columns, checkpoints, exception);
}
catch (...)
{
format->resetParser();
auto exception = Exception(ErrorCodes::UNKNOWN_EXCEPTION, "Unknowk exception while executing StreamingFormatExecutor with format {}", format->getName());
return on_error(result_columns, exception);
auto exception = Exception(ErrorCodes::UNKNOWN_EXCEPTION, "Unknown exception while executing StreamingFormatExecutor with format {}", format->getName());
return on_error(result_columns, checkpoints, exception);
}
}

View File

@ -19,12 +19,12 @@ public:
/// and exception to rethrow it or add context to it.
/// Should return number of new rows, which are added in callback
/// to result columns in comparison to previous call of `execute`.
using ErrorCallback = std::function<size_t(const MutableColumns &, Exception &)>;
using ErrorCallback = std::function<size_t(const MutableColumns &, const ColumnCheckpoints &, Exception &)>;
StreamingFormatExecutor(
const Block & header_,
InputFormatPtr format_,
ErrorCallback on_error_ = [](const MutableColumns &, Exception & e) -> size_t { throw std::move(e); },
ErrorCallback on_error_ = [](const MutableColumns &, const ColumnCheckpoints, Exception & e) -> size_t { throw std::move(e); },
SimpleTransformPtr adding_defaults_transform_ = nullptr);
/// Returns numbers of new read rows.
@ -50,6 +50,7 @@ private:
InputPort port;
MutableColumns result_columns;
ColumnCheckpoints checkpoints;
};
}

View File

@ -105,6 +105,10 @@ Chunk IRowInputFormat::read()
size_t num_columns = header.columns();
MutableColumns columns = header.cloneEmptyColumns();
ColumnCheckpoints checkpoints(columns.size());
for (size_t column_idx = 0; column_idx < columns.size(); ++column_idx)
checkpoints[column_idx] = columns[column_idx]->getCheckpoint();
block_missing_values.clear();
size_t num_rows = 0;
@ -130,6 +134,9 @@ Chunk IRowInputFormat::read()
{
try
{
for (size_t column_idx = 0; column_idx < columns.size(); ++column_idx)
columns[column_idx]->updateCheckpoint(*checkpoints[column_idx]);
info.read_columns.clear();
continue_reading = readRow(columns, info);
@ -193,14 +200,9 @@ Chunk IRowInputFormat::read()
syncAfterError();
/// Truncate all columns in block to initial size (remove values, that was appended to only part of columns).
/// Rollback all columns in block to initial size (remove values, that was appended to only part of columns).
for (size_t column_idx = 0; column_idx < num_columns; ++column_idx)
{
auto & column = columns[column_idx];
if (column->size() > num_rows)
column->popBack(column->size() - num_rows);
}
columns[column_idx]->rollback(*checkpoints[column_idx]);
}
}
}

View File

@ -86,21 +86,18 @@ Chunk FileLogSource::generate()
std::optional<String> exception_message;
size_t total_rows = 0;
auto on_error = [&](const MutableColumns & result_columns, Exception & e)
auto on_error = [&](const MutableColumns & result_columns, const ColumnCheckpoints & checkpoints, Exception & e)
{
if (handle_error_mode == StreamingHandleErrorMode::STREAM)
{
exception_message = e.message();
for (const auto & column : result_columns)
for (size_t i = 0; i < result_columns.size(); ++i)
{
// We could already push some rows to result_columns
// before exception, we need to fix it.
auto cur_rows = column->size();
if (cur_rows > total_rows)
column->popBack(cur_rows - total_rows);
// We could already push some rows to result_columns before exception, we need to fix it.
result_columns[i]->rollback(*checkpoints[i]);
// All data columns will get default value in case of error.
column->insertDefault();
result_columns[i]->insertDefault();
}
return 1;

View File

@ -108,23 +108,20 @@ Chunk KafkaSource::generateImpl()
size_t total_rows = 0;
size_t failed_poll_attempts = 0;
auto on_error = [&](const MutableColumns & result_columns, Exception & e)
auto on_error = [&](const MutableColumns & result_columns, const ColumnCheckpoints & checkpoints, Exception & e)
{
ProfileEvents::increment(ProfileEvents::KafkaMessagesFailed);
if (put_error_to_stream)
{
exception_message = e.message();
for (const auto & column : result_columns)
for (size_t i = 0; i < result_columns.size(); ++i)
{
// read_kafka_message could already push some rows to result_columns
// before exception, we need to fix it.
auto cur_rows = column->size();
if (cur_rows > total_rows)
column->popBack(cur_rows - total_rows);
// We could already push some rows to result_columns before exception, we need to fix it.
result_columns[i]->rollback(*checkpoints[i]);
// all data columns will get default value in case of error
column->insertDefault();
result_columns[i]->insertDefault();
}
return 1;

View File

@ -817,23 +817,20 @@ StorageKafka2::PolledBatchInfo StorageKafka2::pollConsumer(
size_t total_rows = 0;
size_t failed_poll_attempts = 0;
auto on_error = [&](const MutableColumns & result_columns, Exception & e)
auto on_error = [&](const MutableColumns & result_columns, const ColumnCheckpoints & checkpoints, Exception & e)
{
ProfileEvents::increment(ProfileEvents::KafkaMessagesFailed);
if (put_error_to_stream)
{
exception_message = e.message();
for (const auto & column : result_columns)
for (size_t i = 0; i < result_columns.size(); ++i)
{
// read_kafka_message could already push some rows to result_columns
// before exception, we need to fix it.
auto cur_rows = column->size();
if (cur_rows > total_rows)
column->popBack(cur_rows - total_rows);
// We could already push some rows to result_columns before exception, we need to fix it.
result_columns[i]->rollback(*checkpoints[i]);
// all data columns will get default value in case of error
column->insertDefault();
result_columns[i]->insertDefault();
}
return 1;

View File

@ -102,21 +102,18 @@ Chunk NATSSource::generate()
storage.getFormatName(), empty_buf, non_virtual_header, context, max_block_size, std::nullopt, 1);
std::optional<String> exception_message;
size_t total_rows = 0;
auto on_error = [&](const MutableColumns & result_columns, Exception & e)
auto on_error = [&](const MutableColumns & result_columns, const ColumnCheckpoints & checkpoints, Exception & e)
{
if (handle_error_mode == StreamingHandleErrorMode::STREAM)
{
exception_message = e.message();
for (const auto & column : result_columns)
for (size_t i = 0; i < result_columns.size(); ++i)
{
// We could already push some rows to result_columns
// before exception, we need to fix it.
auto cur_rows = column->size();
if (cur_rows > total_rows)
column->popBack(cur_rows - total_rows);
// We could already push some rows to result_columns before exception, we need to fix it.
result_columns[i]->rollback(*checkpoints[i]);
// All data columns will get default value in case of error.
column->insertDefault();
result_columns[i]->insertDefault();
}
return 1;

View File

@ -161,21 +161,18 @@ Chunk RabbitMQSource::generateImpl()
std::optional<String> exception_message;
size_t total_rows = 0;
auto on_error = [&](const MutableColumns & result_columns, Exception & e)
auto on_error = [&](const MutableColumns & result_columns, const ColumnCheckpoints & checkpoints, Exception & e)
{
if (handle_error_mode == StreamingHandleErrorMode::STREAM)
{
exception_message = e.message();
for (const auto & column : result_columns)
for (size_t i = 0; i < result_columns.size(); ++i)
{
// We could already push some rows to result_columns
// before exception, we need to fix it.
auto cur_rows = column->size();
if (cur_rows > total_rows)
column->popBack(cur_rows - total_rows);
// We could already push some rows to result_columns before exception, we need to fix it.
result_columns[i]->rollback(*checkpoints[i]);
// All data columns will get default value in case of error.
column->insertDefault();
result_columns[i]->insertDefault();
}
return 1;

View File

@ -0,0 +1,23 @@
#!/usr/bin/env bash
CUR_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CUR_DIR"/../shell_config.sh
${CLICKHOUSE_CLIENT} -q "
DROP TABLE IF EXISTS async_inserts_native;
CREATE TABLE async_inserts_native (m Map(UInt64, UInt64), v UInt64 MATERIALIZED m[4]) ENGINE = Memory;
"
url="${CLICKHOUSE_URL}&async_insert=1&async_insert_busy_timeout_max_ms=1000&async_insert_busy_timeout_min_ms=1000&wait_for_async_insert=1"
# This test runs inserts with memory_tracker_fault_probability > 0 to trigger memory limit during insertion.
# If rollback of columns is wrong in that case it may produce LOGICAL_ERROR and it will caught by termintation of server in debug mode.
for _ in {1..10}; do
${CLICKHOUSE_CLIENT} -q "SELECT (range(number), range(number))::Map(UInt64, UInt64) AS m FROM numbers(1000) FORMAT Native" | \
${CLICKHOUSE_CURL} -sS -X POST "${url}&max_block_size=100&memory_tracker_fault_probability=0.01&query=INSERT+INTO+async_inserts_native+FORMAT+Native" --data-binary @- >/dev/null 2>&1 &
done
wait
${CLICKHOUSE_CLIENT} -q "DROP TABLE async_inserts_native;"

View File

@ -0,0 +1,18 @@
#!/usr/bin/env bash
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CURDIR"/../shell_config.sh
DATA_FILE=$CLICKHOUSE_TEST_UNIQUE_NAME.data
$CLICKHOUSE_LOCAL -q "select tuple(1, x'00000000000000000000FFFF0000000000') as x format BSONEachRow" > $DATA_FILE
$CLICKHOUSE_LOCAL -q "select * from file('$DATA_FILE', BSONEachRow, 'x Tuple(UInt32, IPv6)') settings input_format_allow_errors_num=1"
$CLICKHOUSE_LOCAL -q "select [x'00000000000000000000FFFF00000000', x'00000000000000000000FFFF0000000000'] as x format BSONEachRow" > $DATA_FILE
$CLICKHOUSE_LOCAL -q "select * from file('$DATA_FILE', BSONEachRow, 'x Array(IPv6)') settings input_format_allow_errors_num=1"
$CLICKHOUSE_LOCAL -q "select map('key1', x'00000000000000000000FFFF00000000', 'key2', x'00000000000000000000FFFF0000000000') as x format BSONEachRow" > $DATA_FILE
$CLICKHOUSE_LOCAL -q "select * from file('$DATA_FILE', BSONEachRow, 'x Map(String, IPv6)') settings input_format_allow_errors_num=1"
rm $DATA_FILE