diff --git a/src/Columns/ColumnArray.cpp b/src/Columns/ColumnArray.cpp index 83d4c24c769..0c6d7c4e5c6 100644 --- a/src/Columns/ColumnArray.cpp +++ b/src/Columns/ColumnArray.cpp @@ -369,6 +369,23 @@ void ColumnArray::popBack(size_t n) offsets_data.resize_assume_reserved(offsets_data.size() - n); } +ColumnCheckpointPtr ColumnArray::getCheckpoint() const +{ + return std::make_shared(size(), getData().getCheckpoint()); +} + +void ColumnArray::updateCheckpoint(ColumnCheckpoint & checkpoint) const +{ + checkpoint.size = size(); + getData().updateCheckpoint(*assert_cast(checkpoint).nested); +} + +void ColumnArray::rollback(const ColumnCheckpoint & checkpoint) +{ + getOffsets().resize_assume_reserved(checkpoint.size); + getData().rollback(*assert_cast(checkpoint).nested); +} + int ColumnArray::compareAtImpl(size_t n, size_t m, const IColumn & rhs_, int nan_direction_hint, const Collator * collator) const { const ColumnArray & rhs = assert_cast(rhs_); diff --git a/src/Columns/ColumnArray.h b/src/Columns/ColumnArray.h index f77268a8be6..ec14b096055 100644 --- a/src/Columns/ColumnArray.h +++ b/src/Columns/ColumnArray.h @@ -161,6 +161,10 @@ public: ColumnPtr compress() const override; + ColumnCheckpointPtr getCheckpoint() const override; + void updateCheckpoint(ColumnCheckpoint & checkpoint) const override; + void rollback(const ColumnCheckpoint & checkpoint) override; + void forEachSubcolumn(MutableColumnCallback callback) override { callback(offsets); diff --git a/src/Columns/ColumnDynamic.cpp b/src/Columns/ColumnDynamic.cpp index a0a9bda7678..45501903a66 100644 --- a/src/Columns/ColumnDynamic.cpp +++ b/src/Columns/ColumnDynamic.cpp @@ -979,6 +979,92 @@ ColumnPtr ColumnDynamic::compress() const }); } +void ColumnDynamic::updateCheckpoint(ColumnCheckpoint & checkpoint) const +{ + auto & nested = assert_cast(checkpoint).nested; + const auto & variants = variant_column_ptr->getVariants(); + + size_t old_size = nested.size(); + chassert(old_size <= variants.size()); + + for (size_t i = 0; i < old_size; ++i) + { + variants[i]->updateCheckpoint(*nested[i]); + } + + /// If column has new variants since last checkpoint create checkpoints for them. + if (old_size < variants.size()) + { + nested.resize(variants.size()); + for (size_t i = old_size; i < variants.size(); ++i) + nested[i] = variants[i]->getCheckpoint(); + } + + checkpoint.size = size(); +} + + +DataTypePtr ColumnDynamic::popBackVariants(const VariantInfo & info, const std::vector & local_to_global_discriminators, size_t n) +{ + const auto & type_variant = assert_cast(*info.variant_type); + + std::unordered_map discriminator_to_name; + std::unordered_map name_to_data_type; + + for (const auto & [name, discriminator] : info.variant_name_to_discriminator) + discriminator_to_name.emplace(discriminator, name); + + for (const auto & type : type_variant.getVariants()) + name_to_data_type.emplace(type->getName(), type); + + /// Remove last n variants according to global discriminators. + /// This code relies on invariant that new variants are always added to the end in ColumnVariant. + for (auto it = local_to_global_discriminators.rbegin(); it < local_to_global_discriminators.rbegin() + n; ++it) + discriminator_to_name.erase(*it); + + DataTypes new_variants; + for (const auto & [d, name] : discriminator_to_name) + new_variants.push_back(name_to_data_type.at(name)); + + return std::make_shared(std::move(new_variants)); +} + +void ColumnDynamic::rollback(const ColumnCheckpoint & checkpoint) +{ + const auto & nested = assert_cast(checkpoint).nested; + chassert(nested.size() <= variant_column_ptr->getNumVariants()); + + /// The structure hasn't changed, so we can use generic rollback of Variant column + if (nested.size() == variant_column_ptr->getNumVariants()) + { + variant_column_ptr->rollback(checkpoint); + return; + } + + auto new_subcolumns = variant_column_ptr->getVariants(); + auto new_discriminators_map = variant_column_ptr->getLocalToGlobalDiscriminatorsMapping(); + auto new_discriminators_column = variant_column_ptr->getLocalDiscriminatorsPtr(); + auto new_offses_column = variant_column_ptr->getOffsetsPtr(); + + /// Remove new variants that were added since last checkpoint. + auto new_variant_type = popBackVariants(variant_info, new_discriminators_map, variant_column_ptr->getNumVariants() - nested.size()); + createVariantInfo(new_variant_type); + variant_mappings_cache.clear(); + + new_subcolumns.resize(nested.size()); + new_discriminators_map.resize(nested.size()); + + /// Manually rollback internals of Variant column + new_discriminators_column->assumeMutable()->popBack(new_discriminators_column->size() - checkpoint.size); + new_offses_column->assumeMutable()->popBack(new_offses_column->size() - checkpoint.size); + + for (size_t i = 0; i < nested.size(); ++i) + new_subcolumns[i]->rollback(*nested[i]); + + variant_column = ColumnVariant::create(new_discriminators_column, new_offses_column, Columns(new_subcolumns.begin(), new_subcolumns.end()), new_discriminators_map); + variant_column_ptr = assert_cast(variant_column.get()); +} + String ColumnDynamic::getTypeNameAt(size_t row_num) const { const auto & variant_col = getVariantColumn(); diff --git a/src/Columns/ColumnDynamic.h b/src/Columns/ColumnDynamic.h index 17b0d80e5eb..8d8097905bf 100644 --- a/src/Columns/ColumnDynamic.h +++ b/src/Columns/ColumnDynamic.h @@ -304,6 +304,15 @@ public: variant_column_ptr->protect(); } + ColumnCheckpointPtr getCheckpoint() const override + { + return variant_column_ptr->getCheckpoint(); + } + + void updateCheckpoint(ColumnCheckpoint & checkpoint) const override; + + void rollback(const ColumnCheckpoint & checkpoint) override; + void forEachSubcolumn(MutableColumnCallback callback) override { callback(variant_column); @@ -444,6 +453,8 @@ private: void updateVariantInfoAndExpandVariantColumn(const DataTypePtr & new_variant_type); + static DataTypePtr popBackVariants(const VariantInfo & info, const std::vector & local_to_global_discriminators, size_t n); + WrappedPtr variant_column; /// Store and use pointer to ColumnVariant to avoid virtual calls. /// ColumnDynamic is widely used inside ColumnObject for each path and diff --git a/src/Columns/ColumnMap.cpp b/src/Columns/ColumnMap.cpp index 536da4d06d0..7ebbed930d8 100644 --- a/src/Columns/ColumnMap.cpp +++ b/src/Columns/ColumnMap.cpp @@ -312,6 +312,21 @@ void ColumnMap::getExtremes(Field & min, Field & max) const max = std::move(map_max_value); } +ColumnCheckpointPtr ColumnMap::getCheckpoint() const +{ + return nested->getCheckpoint(); +} + +void ColumnMap::updateCheckpoint(ColumnCheckpoint & checkpoint) const +{ + nested->updateCheckpoint(checkpoint); +} + +void ColumnMap::rollback(const ColumnCheckpoint & checkpoint) +{ + nested->rollback(checkpoint); +} + void ColumnMap::forEachSubcolumn(MutableColumnCallback callback) { callback(nested); diff --git a/src/Columns/ColumnMap.h b/src/Columns/ColumnMap.h index 39d15a586b9..575114f8d3a 100644 --- a/src/Columns/ColumnMap.h +++ b/src/Columns/ColumnMap.h @@ -102,6 +102,9 @@ public: size_t byteSizeAt(size_t n) const override; size_t allocatedBytes() const override; void protect() override; + ColumnCheckpointPtr getCheckpoint() const override; + void updateCheckpoint(ColumnCheckpoint & checkpoint) const override; + void rollback(const ColumnCheckpoint & checkpoint) override; void forEachSubcolumn(MutableColumnCallback callback) override; void forEachSubcolumnRecursively(RecursiveMutableColumnCallback callback) override; bool structureEquals(const IColumn & rhs) const override; diff --git a/src/Columns/ColumnNullable.cpp b/src/Columns/ColumnNullable.cpp index ec375ea5a8d..61feca60e42 100644 --- a/src/Columns/ColumnNullable.cpp +++ b/src/Columns/ColumnNullable.cpp @@ -305,6 +305,23 @@ void ColumnNullable::popBack(size_t n) getNullMapColumn().popBack(n); } +ColumnCheckpointPtr ColumnNullable::getCheckpoint() const +{ + return std::make_shared(size(), nested_column->getCheckpoint()); +} + +void ColumnNullable::updateCheckpoint(ColumnCheckpoint & checkpoint) const +{ + checkpoint.size = size(); + nested_column->updateCheckpoint(*assert_cast(checkpoint).nested); +} + +void ColumnNullable::rollback(const ColumnCheckpoint & checkpoint) +{ + getNullMapData().resize_assume_reserved(checkpoint.size); + nested_column->rollback(*assert_cast(checkpoint).nested); +} + ColumnPtr ColumnNullable::filter(const Filter & filt, ssize_t result_size_hint) const { ColumnPtr filtered_data = getNestedColumn().filter(filt, result_size_hint); diff --git a/src/Columns/ColumnNullable.h b/src/Columns/ColumnNullable.h index 78274baca51..32ce66c5965 100644 --- a/src/Columns/ColumnNullable.h +++ b/src/Columns/ColumnNullable.h @@ -143,6 +143,10 @@ public: ColumnPtr compress() const override; + ColumnCheckpointPtr getCheckpoint() const override; + void updateCheckpoint(ColumnCheckpoint & checkpoint) const override; + void rollback(const ColumnCheckpoint & checkpoint) override; + void forEachSubcolumn(MutableColumnCallback callback) override { callback(nested_column); diff --git a/src/Columns/ColumnObject.cpp b/src/Columns/ColumnObject.cpp index e397b03b69e..c1b31731147 100644 --- a/src/Columns/ColumnObject.cpp +++ b/src/Columns/ColumnObject.cpp @@ -30,6 +30,23 @@ const std::shared_ptr & getDynamicSerialization() return dynamic_serialization; } +struct ColumnObjectCheckpoint : public ColumnCheckpoint +{ + using CheckpointsMap = std::unordered_map; + + ColumnObjectCheckpoint(size_t size_, CheckpointsMap typed_paths_, CheckpointsMap dynamic_paths_, ColumnCheckpointPtr shared_data_) + : ColumnCheckpoint(size_) + , typed_paths(std::move(typed_paths_)) + , dynamic_paths(std::move(dynamic_paths_)) + , shared_data(std::move(shared_data_)) + { + } + + CheckpointsMap typed_paths; + CheckpointsMap dynamic_paths; + ColumnCheckpointPtr shared_data; +}; + } ColumnObject::ColumnObject( @@ -655,6 +672,69 @@ void ColumnObject::popBack(size_t n) shared_data->popBack(n); } +ColumnCheckpointPtr ColumnObject::getCheckpoint() const +{ + auto get_checkpoints = [](const auto & columns) + { + std::unordered_map checkpoints; + for (const auto & [name, column] : columns) + checkpoints[name] = column->getCheckpoint(); + + return checkpoints; + }; + + return std::make_shared(size(), get_checkpoints(typed_paths), get_checkpoints(dynamic_paths_ptrs), shared_data->getCheckpoint()); +} + +void ColumnObject::updateCheckpoint(ColumnCheckpoint & checkpoint) const +{ + auto & object_checkpoint = assert_cast(checkpoint); + + auto update_checkpoints = [&](const auto & columns_map, auto & checkpoints_map) + { + for (const auto & [name, column] : columns_map) + { + auto & nested = checkpoints_map[name]; + if (!nested) + nested = column->getCheckpoint(); + else + column->updateCheckpoint(*nested); + } + }; + + checkpoint.size = size(); + update_checkpoints(typed_paths, object_checkpoint.typed_paths); + update_checkpoints(dynamic_paths, object_checkpoint.dynamic_paths); + shared_data->updateCheckpoint(*object_checkpoint.shared_data); +} + +void ColumnObject::rollback(const ColumnCheckpoint & checkpoint) +{ + const auto & object_checkpoint = assert_cast(checkpoint); + + auto rollback_columns = [&](auto & columns_map, const auto & checkpoints_map) + { + NameSet names_to_remove; + + /// Rollback subcolumns and remove paths that were not in checkpoint. + for (auto & [name, column] : columns_map) + { + auto it = checkpoints_map.find(name); + if (it == checkpoints_map.end()) + names_to_remove.insert(name); + else + column->rollback(*it->second); + } + + for (const auto & name : names_to_remove) + columns_map.erase(name); + }; + + rollback_columns(typed_paths, object_checkpoint.typed_paths); + rollback_columns(dynamic_paths, object_checkpoint.dynamic_paths); + shared_data->rollback(*object_checkpoint.shared_data); +} + StringRef ColumnObject::serializeValueIntoArena(size_t n, Arena & arena, const char *& begin) const { StringRef res(begin, 0); diff --git a/src/Columns/ColumnObject.h b/src/Columns/ColumnObject.h index f530ed29ef3..e444db099b0 100644 --- a/src/Columns/ColumnObject.h +++ b/src/Columns/ColumnObject.h @@ -159,6 +159,9 @@ public: size_t byteSizeAt(size_t n) const override; size_t allocatedBytes() const override; void protect() override; + ColumnCheckpointPtr getCheckpoint() const override; + void updateCheckpoint(ColumnCheckpoint & checkpoint) const override; + void rollback(const ColumnCheckpoint & checkpoint) override; void forEachSubcolumn(MutableColumnCallback callback) override; diff --git a/src/Columns/ColumnSparse.cpp b/src/Columns/ColumnSparse.cpp index a908d970a15..a0e47e65fc6 100644 --- a/src/Columns/ColumnSparse.cpp +++ b/src/Columns/ColumnSparse.cpp @@ -308,6 +308,28 @@ void ColumnSparse::popBack(size_t n) _size = new_size; } +ColumnCheckpointPtr ColumnSparse::getCheckpoint() const +{ + return std::make_shared(size(), values->getCheckpoint()); +} + +void ColumnSparse::updateCheckpoint(ColumnCheckpoint & checkpoint) const +{ + checkpoint.size = size(); + values->updateCheckpoint(*assert_cast(checkpoint).nested); +} + +void ColumnSparse::rollback(const ColumnCheckpoint & checkpoint) +{ + _size = checkpoint.size; + + const auto & nested = *assert_cast(checkpoint).nested; + chassert(nested.size > 0); + + values->rollback(nested); + getOffsetsData().resize_assume_reserved(nested.size - 1); +} + ColumnPtr ColumnSparse::filter(const Filter & filt, ssize_t) const { if (_size != filt.size()) diff --git a/src/Columns/ColumnSparse.h b/src/Columns/ColumnSparse.h index 7a4d914e62a..619dce63c1e 100644 --- a/src/Columns/ColumnSparse.h +++ b/src/Columns/ColumnSparse.h @@ -149,6 +149,10 @@ public: ColumnPtr compress() const override; + ColumnCheckpointPtr getCheckpoint() const override; + void updateCheckpoint(ColumnCheckpoint & checkpoint) const override; + void rollback(const ColumnCheckpoint & checkpoint) override; + void forEachSubcolumn(MutableColumnCallback callback) override; void forEachSubcolumnRecursively(RecursiveMutableColumnCallback callback) override; diff --git a/src/Columns/ColumnString.cpp b/src/Columns/ColumnString.cpp index 00cf3bd9c30..269c20397b4 100644 --- a/src/Columns/ColumnString.cpp +++ b/src/Columns/ColumnString.cpp @@ -240,6 +240,23 @@ ColumnPtr ColumnString::permute(const Permutation & perm, size_t limit) const return permuteImpl(*this, perm, limit); } +ColumnCheckpointPtr ColumnString::getCheckpoint() const +{ + auto nested = std::make_shared(chars.size()); + return std::make_shared(size(), std::move(nested)); +} + +void ColumnString::updateCheckpoint(ColumnCheckpoint & checkpoint) const +{ + checkpoint.size = size(); + assert_cast(checkpoint).nested->size = chars.size(); +} + +void ColumnString::rollback(const ColumnCheckpoint & checkpoint) +{ + offsets.resize_assume_reserved(checkpoint.size); + chars.resize_assume_reserved(assert_cast(checkpoint).nested->size); +} void ColumnString::collectSerializedValueSizes(PaddedPODArray & sizes, const UInt8 * is_null) const { diff --git a/src/Columns/ColumnString.h b/src/Columns/ColumnString.h index ec0563b3f00..c2371412437 100644 --- a/src/Columns/ColumnString.h +++ b/src/Columns/ColumnString.h @@ -194,6 +194,10 @@ public: offsets.resize_assume_reserved(offsets.size() - n); } + ColumnCheckpointPtr getCheckpoint() const override; + void updateCheckpoint(ColumnCheckpoint & checkpoint) const override; + void rollback(const ColumnCheckpoint & checkpoint) override; + void collectSerializedValueSizes(PaddedPODArray & sizes, const UInt8 * is_null) const override; StringRef serializeValueIntoArena(size_t n, Arena & arena, char const *& begin) const override; diff --git a/src/Columns/ColumnTuple.cpp b/src/Columns/ColumnTuple.cpp index e741eb51c68..51617359318 100644 --- a/src/Columns/ColumnTuple.cpp +++ b/src/Columns/ColumnTuple.cpp @@ -254,6 +254,37 @@ void ColumnTuple::popBack(size_t n) column->popBack(n); } +ColumnCheckpointPtr ColumnTuple::getCheckpoint() const +{ + ColumnCheckpoints checkpoints; + checkpoints.reserve(columns.size()); + + for (const auto & column : columns) + checkpoints.push_back(column->getCheckpoint()); + + return std::make_shared(size(), std::move(checkpoints)); +} + +void ColumnTuple::updateCheckpoint(ColumnCheckpoint & checkpoint) const +{ + auto & checkpoints = assert_cast(checkpoint).nested; + chassert(checkpoints.size() == columns.size()); + + checkpoint.size = size(); + for (size_t i = 0; i < columns.size(); ++i) + columns[i]->updateCheckpoint(*checkpoints[i]); +} + +void ColumnTuple::rollback(const ColumnCheckpoint & checkpoint) +{ + column_length = checkpoint.size; + const auto & checkpoints = assert_cast(checkpoint).nested; + + chassert(columns.size() == checkpoints.size()); + for (size_t i = 0; i < columns.size(); ++i) + columns[i]->rollback(*checkpoints[i]); +} + StringRef ColumnTuple::serializeValueIntoArena(size_t n, Arena & arena, char const *& begin) const { if (columns.empty()) diff --git a/src/Columns/ColumnTuple.h b/src/Columns/ColumnTuple.h index 6968294aef9..c73f90f13d9 100644 --- a/src/Columns/ColumnTuple.h +++ b/src/Columns/ColumnTuple.h @@ -118,6 +118,9 @@ public: size_t byteSizeAt(size_t n) const override; size_t allocatedBytes() const override; void protect() override; + ColumnCheckpointPtr getCheckpoint() const override; + void updateCheckpoint(ColumnCheckpoint & checkpoint) const override; + void rollback(const ColumnCheckpoint & checkpoint) override; void forEachSubcolumn(MutableColumnCallback callback) override; void forEachSubcolumnRecursively(RecursiveMutableColumnCallback callback) override; bool structureEquals(const IColumn & rhs) const override; diff --git a/src/Columns/ColumnVariant.cpp b/src/Columns/ColumnVariant.cpp index c6511695f5c..b03313fd6d0 100644 --- a/src/Columns/ColumnVariant.cpp +++ b/src/Columns/ColumnVariant.cpp @@ -739,6 +739,39 @@ void ColumnVariant::popBack(size_t n) offsets->popBack(n); } +ColumnCheckpointPtr ColumnVariant::getCheckpoint() const +{ + ColumnCheckpoints checkpoints; + checkpoints.reserve(variants.size()); + + for (const auto & column : variants) + checkpoints.push_back(column->getCheckpoint()); + + return std::make_shared(size(), std::move(checkpoints)); +} + +void ColumnVariant::updateCheckpoint(ColumnCheckpoint & checkpoint) const +{ + auto & checkpoints = assert_cast(checkpoint).nested; + chassert(checkpoints.size() == variants.size()); + + checkpoint.size = size(); + for (size_t i = 0; i < variants.size(); ++i) + variants[i]->updateCheckpoint(*checkpoints[i]); +} + +void ColumnVariant::rollback(const ColumnCheckpoint & checkpoint) +{ + getOffsets().resize_assume_reserved(checkpoint.size); + getLocalDiscriminators().resize_assume_reserved(checkpoint.size); + + const auto & checkpoints = assert_cast(checkpoint).nested; + chassert(variants.size() == checkpoints.size()); + + for (size_t i = 0; i < variants.size(); ++i) + variants[i]->rollback(*checkpoints[i]); +} + StringRef ColumnVariant::serializeValueIntoArena(size_t n, Arena & arena, char const *& begin) const { /// During any serialization/deserialization we should always use global discriminators. diff --git a/src/Columns/ColumnVariant.h b/src/Columns/ColumnVariant.h index 925eab74af8..332c36d1153 100644 --- a/src/Columns/ColumnVariant.h +++ b/src/Columns/ColumnVariant.h @@ -248,6 +248,9 @@ public: size_t byteSizeAt(size_t n) const override; size_t allocatedBytes() const override; void protect() override; + ColumnCheckpointPtr getCheckpoint() const override; + void updateCheckpoint(ColumnCheckpoint & checkpoint) const override; + void rollback(const ColumnCheckpoint & checkpoint) override; void forEachSubcolumn(MutableColumnCallback callback) override; void forEachSubcolumnRecursively(RecursiveMutableColumnCallback callback) override; bool structureEquals(const IColumn & rhs) const override; diff --git a/src/Columns/IColumn.h b/src/Columns/IColumn.h index e4fe233ffdf..95becba3fdb 100644 --- a/src/Columns/IColumn.h +++ b/src/Columns/IColumn.h @@ -49,6 +49,40 @@ struct EqualRange using EqualRanges = std::vector; +/// A checkpoint that contains size of column and all its subcolumns. +/// It can be used to rollback column to the previous state, for example +/// after failed parsing when column may be in inconsistent state. +struct ColumnCheckpoint +{ + size_t size; + + explicit ColumnCheckpoint(size_t size_) : size(size_) {} + virtual ~ColumnCheckpoint() = default; +}; + +using ColumnCheckpointPtr = std::shared_ptr; +using ColumnCheckpoints = std::vector; + +struct ColumnCheckpointWithNested : public ColumnCheckpoint +{ + ColumnCheckpointWithNested(size_t size_, ColumnCheckpointPtr nested_) + : ColumnCheckpoint(size_), nested(std::move(nested_)) + { + } + + ColumnCheckpointPtr nested; +}; + +struct ColumnCheckpointWithMultipleNested : public ColumnCheckpoint +{ + ColumnCheckpointWithMultipleNested(size_t size_, ColumnCheckpoints nested_) + : ColumnCheckpoint(size_), nested(std::move(nested_)) + { + } + + ColumnCheckpoints nested; +}; + /// Declares interface to store columns in memory. class IColumn : public COW { @@ -509,6 +543,17 @@ public: /// The operation is slow and performed only for debug builds. virtual void protect() {} + /// Returns checkpoint of current state of column. + virtual ColumnCheckpointPtr getCheckpoint() const { return std::make_shared(size()); } + + /// Updates the checkpoint with current state. It is used to avoid extra allocations in 'getCheckpoint'. + virtual void updateCheckpoint(ColumnCheckpoint & checkpoint) const { checkpoint.size = size(); } + + /// Rollbacks column to the checkpoint. + /// Unlike 'popBack' this method should work correctly even if column has invalid state. + /// Sizes of columns in checkpoint must be less or equal than current size. + virtual void rollback(const ColumnCheckpoint & checkpoint) { popBack(size() - checkpoint.size); } + /// If the column contains subcolumns (such as Array, Nullable, etc), do callback on them. /// Shallow: doesn't do recursive calls; don't do call for itself. diff --git a/src/Columns/tests/gtest_column_dynamic.cpp b/src/Columns/tests/gtest_column_dynamic.cpp index de76261229d..f956f60b378 100644 --- a/src/Columns/tests/gtest_column_dynamic.cpp +++ b/src/Columns/tests/gtest_column_dynamic.cpp @@ -920,3 +920,72 @@ TEST(ColumnDynamic, compare) ASSERT_EQ(column_from->compareAt(3, 2, *column_from, -1), -1); ASSERT_EQ(column_from->compareAt(3, 4, *column_from, -1), -1); } + +TEST(ColumnDynamic, rollback) +{ + auto check_variant = [](const ColumnVariant & column_variant, std::vector sizes) + { + ASSERT_EQ(column_variant.getNumVariants(), sizes.size()); + size_t num_rows = 0; + + for (size_t i = 0; i < sizes.size(); ++i) + { + ASSERT_EQ(column_variant.getVariants()[i]->size(), sizes[i]); + num_rows += sizes[i]; + } + + ASSERT_EQ(num_rows, column_variant.size()); + }; + + auto check_checkpoint = [&](const ColumnCheckpoint & cp, std::vector sizes) + { + const auto & nested = assert_cast(cp).nested; + ASSERT_EQ(nested.size(), sizes.size()); + size_t num_rows = 0; + + for (size_t i = 0; i < sizes.size(); ++i) + { + ASSERT_EQ(nested[i]->size, sizes[i]); + num_rows += sizes[i]; + } + + ASSERT_EQ(num_rows, cp.size); + }; + + std::vector>> checkpoints; + + auto column = ColumnDynamic::create(2); + auto checkpoint = column->getCheckpoint(); + + column->insert(Field(42)); + + column->updateCheckpoint(*checkpoint); + checkpoints.emplace_back(checkpoint, std::vector{0, 1}); + + column->insert(Field("str1")); + column->rollback(*checkpoint); + + check_checkpoint(*checkpoint, checkpoints.back().second); + check_variant(column->getVariantColumn(), checkpoints.back().second); + + column->insert("str1"); + checkpoints.emplace_back(column->getCheckpoint(), std::vector{0, 1, 1}); + + column->insert("str2"); + checkpoints.emplace_back(column->getCheckpoint(), std::vector{0, 1, 2}); + + column->insert(Array({1, 2})); + checkpoints.emplace_back(column->getCheckpoint(), std::vector{1, 1, 2}); + + column->insert(Field(42.42)); + checkpoints.emplace_back(column->getCheckpoint(), std::vector{2, 1, 2}); + + for (const auto & [cp, sizes] : checkpoints) + { + auto column_copy = column->clone(); + column_copy->rollback(*cp); + + check_checkpoint(*cp, sizes); + check_variant(assert_cast(*column_copy).getVariantColumn(), sizes); + } +} diff --git a/src/Columns/tests/gtest_column_object.cpp b/src/Columns/tests/gtest_column_object.cpp index f6a1da64ba3..a20bd26fabd 100644 --- a/src/Columns/tests/gtest_column_object.cpp +++ b/src/Columns/tests/gtest_column_object.cpp @@ -5,6 +5,7 @@ #include #include +#include "Core/Field.h" #include using namespace DB; @@ -349,3 +350,65 @@ TEST(ColumnObject, SkipSerializedInArena) pos = col2->skipSerializedInArena(pos); ASSERT_EQ(pos, end); } + +TEST(ColumnObject, rollback) +{ + auto type = DataTypeFactory::instance().get("JSON(max_dynamic_types=10, max_dynamic_paths=2, a.a UInt32, a.b UInt32)"); + auto col = type->createColumn(); + auto & col_object = assert_cast(*col); + const auto & typed_paths = col_object.getTypedPaths(); + const auto & dynamic_paths = col_object.getDynamicPaths(); + const auto & shared_data = col_object.getSharedDataColumn(); + + auto assert_sizes = [&](size_t size) + { + for (const auto & [name, column] : typed_paths) + ASSERT_EQ(column->size(), size); + + for (const auto & [name, column] : dynamic_paths) + ASSERT_EQ(column->size(), size); + + ASSERT_EQ(shared_data.size(), size); + }; + + auto checkpoint = col_object.getCheckpoint(); + + col_object.insert(Object{{"a.a", Field{1u}}}); + col_object.updateCheckpoint(*checkpoint); + + col_object.insert(Object{{"a.b", Field{2u}}}); + col_object.insert(Object{{"a.a", Field{3u}}}); + + col_object.rollback(*checkpoint); + + assert_sizes(1); + ASSERT_EQ(typed_paths.size(), 2); + ASSERT_EQ(dynamic_paths.size(), 0); + + ASSERT_EQ((*typed_paths.at("a.a"))[0], Field{1u}); + ASSERT_EQ((*typed_paths.at("a.b"))[0], Field{0u}); + + col_object.insert(Object{{"a.c", Field{"ccc"}}}); + + checkpoint = col_object.getCheckpoint(); + + col_object.insert(Object{{"a.d", Field{"ddd"}}}); + col_object.insert(Object{{"a.e", Field{"eee"}}}); + + assert_sizes(4); + ASSERT_EQ(typed_paths.size(), 2); + ASSERT_EQ(dynamic_paths.size(), 2); + + ASSERT_EQ((*typed_paths.at("a.a"))[0], Field{1u}); + ASSERT_EQ((*dynamic_paths.at("a.c"))[1], Field{"ccc"}); + ASSERT_EQ((*dynamic_paths.at("a.d"))[2], Field{"ddd"}); + + col_object.rollback(*checkpoint); + + assert_sizes(2); + ASSERT_EQ(typed_paths.size(), 2); + ASSERT_EQ(dynamic_paths.size(), 1); + + ASSERT_EQ((*typed_paths.at("a.a"))[0], Field{1u}); + ASSERT_EQ((*dynamic_paths.at("a.c"))[1], Field{"ccc"}); +} diff --git a/src/Interpreters/AsynchronousInsertQueue.cpp b/src/Interpreters/AsynchronousInsertQueue.cpp index 25cd1d0bfa2..6c7deb3cbf1 100644 --- a/src/Interpreters/AsynchronousInsertQueue.cpp +++ b/src/Interpreters/AsynchronousInsertQueue.cpp @@ -1023,15 +1023,14 @@ Chunk AsynchronousInsertQueue::processEntriesWithParsing( adding_defaults_transform = std::make_shared(header, columns, *format, insert_context); } - auto on_error = [&](const MutableColumns & result_columns, Exception & e) + auto on_error = [&](const MutableColumns & result_columns, const ColumnCheckpoints & checkpoints, Exception & e) { current_exception = e.displayText(); LOG_ERROR(logger, "Failed parsing for query '{}' with query id {}. {}", key.query_str, current_entry->query_id, current_exception); - for (const auto & column : result_columns) - if (column->size() > total_rows) - column->popBack(column->size() - total_rows); + for (size_t i = 0; i < result_columns.size(); ++i) + result_columns[i]->rollback(*checkpoints[i]); current_entry->finish(std::current_exception()); return 0; diff --git a/src/Processors/Executors/StreamingFormatExecutor.cpp b/src/Processors/Executors/StreamingFormatExecutor.cpp index 10a7b7fd7f5..2d4b87e9f4d 100644 --- a/src/Processors/Executors/StreamingFormatExecutor.cpp +++ b/src/Processors/Executors/StreamingFormatExecutor.cpp @@ -22,8 +22,12 @@ StreamingFormatExecutor::StreamingFormatExecutor( , adding_defaults_transform(std::move(adding_defaults_transform_)) , port(format->getPort().getHeader(), format.get()) , result_columns(header.cloneEmptyColumns()) + , checkpoints(result_columns.size()) { connect(format->getPort(), port); + + for (size_t i = 0; i < result_columns.size(); ++i) + checkpoints[i] = result_columns[i]->getCheckpoint(); } MutableColumns StreamingFormatExecutor::getResultColumns() @@ -53,6 +57,9 @@ size_t StreamingFormatExecutor::execute(ReadBuffer & buffer) size_t StreamingFormatExecutor::execute() { + for (size_t i = 0; i < result_columns.size(); ++i) + result_columns[i]->updateCheckpoint(*checkpoints[i]); + try { size_t new_rows = 0; @@ -85,19 +92,19 @@ size_t StreamingFormatExecutor::execute() catch (Exception & e) { format->resetParser(); - return on_error(result_columns, e); + return on_error(result_columns, checkpoints, e); } catch (std::exception & e) { format->resetParser(); auto exception = Exception(Exception::CreateFromSTDTag{}, e); - return on_error(result_columns, exception); + return on_error(result_columns, checkpoints, exception); } catch (...) { format->resetParser(); - auto exception = Exception(ErrorCodes::UNKNOWN_EXCEPTION, "Unknowk exception while executing StreamingFormatExecutor with format {}", format->getName()); - return on_error(result_columns, exception); + auto exception = Exception(ErrorCodes::UNKNOWN_EXCEPTION, "Unknown exception while executing StreamingFormatExecutor with format {}", format->getName()); + return on_error(result_columns, checkpoints, exception); } } diff --git a/src/Processors/Executors/StreamingFormatExecutor.h b/src/Processors/Executors/StreamingFormatExecutor.h index f159178df8c..3db5a92ae98 100644 --- a/src/Processors/Executors/StreamingFormatExecutor.h +++ b/src/Processors/Executors/StreamingFormatExecutor.h @@ -19,12 +19,12 @@ public: /// and exception to rethrow it or add context to it. /// Should return number of new rows, which are added in callback /// to result columns in comparison to previous call of `execute`. - using ErrorCallback = std::function; + using ErrorCallback = std::function; StreamingFormatExecutor( const Block & header_, InputFormatPtr format_, - ErrorCallback on_error_ = [](const MutableColumns &, Exception & e) -> size_t { throw std::move(e); }, + ErrorCallback on_error_ = [](const MutableColumns &, const ColumnCheckpoints, Exception & e) -> size_t { throw std::move(e); }, SimpleTransformPtr adding_defaults_transform_ = nullptr); /// Returns numbers of new read rows. @@ -50,6 +50,7 @@ private: InputPort port; MutableColumns result_columns; + ColumnCheckpoints checkpoints; }; } diff --git a/src/Processors/Formats/IRowInputFormat.cpp b/src/Processors/Formats/IRowInputFormat.cpp index 0b6c81923db..0d65fc3b5fa 100644 --- a/src/Processors/Formats/IRowInputFormat.cpp +++ b/src/Processors/Formats/IRowInputFormat.cpp @@ -105,6 +105,10 @@ Chunk IRowInputFormat::read() size_t num_columns = header.columns(); MutableColumns columns = header.cloneEmptyColumns(); + ColumnCheckpoints checkpoints(columns.size()); + for (size_t column_idx = 0; column_idx < columns.size(); ++column_idx) + checkpoints[column_idx] = columns[column_idx]->getCheckpoint(); + block_missing_values.clear(); size_t num_rows = 0; @@ -130,6 +134,9 @@ Chunk IRowInputFormat::read() { try { + for (size_t column_idx = 0; column_idx < columns.size(); ++column_idx) + columns[column_idx]->updateCheckpoint(*checkpoints[column_idx]); + info.read_columns.clear(); continue_reading = readRow(columns, info); @@ -193,14 +200,9 @@ Chunk IRowInputFormat::read() syncAfterError(); - /// Truncate all columns in block to initial size (remove values, that was appended to only part of columns). - + /// Rollback all columns in block to initial size (remove values, that was appended to only part of columns). for (size_t column_idx = 0; column_idx < num_columns; ++column_idx) - { - auto & column = columns[column_idx]; - if (column->size() > num_rows) - column->popBack(column->size() - num_rows); - } + columns[column_idx]->rollback(*checkpoints[column_idx]); } } } diff --git a/src/Storages/FileLog/FileLogSource.cpp b/src/Storages/FileLog/FileLogSource.cpp index eb3ff0436a5..36faa28ac6a 100644 --- a/src/Storages/FileLog/FileLogSource.cpp +++ b/src/Storages/FileLog/FileLogSource.cpp @@ -86,21 +86,18 @@ Chunk FileLogSource::generate() std::optional exception_message; size_t total_rows = 0; - auto on_error = [&](const MutableColumns & result_columns, Exception & e) + auto on_error = [&](const MutableColumns & result_columns, const ColumnCheckpoints & checkpoints, Exception & e) { if (handle_error_mode == StreamingHandleErrorMode::STREAM) { exception_message = e.message(); - for (const auto & column : result_columns) + for (size_t i = 0; i < result_columns.size(); ++i) { - // We could already push some rows to result_columns - // before exception, we need to fix it. - auto cur_rows = column->size(); - if (cur_rows > total_rows) - column->popBack(cur_rows - total_rows); + // We could already push some rows to result_columns before exception, we need to fix it. + result_columns[i]->rollback(*checkpoints[i]); // All data columns will get default value in case of error. - column->insertDefault(); + result_columns[i]->insertDefault(); } return 1; diff --git a/src/Storages/Kafka/KafkaSource.cpp b/src/Storages/Kafka/KafkaSource.cpp index 3ddd0d1be8c..f03d13a2837 100644 --- a/src/Storages/Kafka/KafkaSource.cpp +++ b/src/Storages/Kafka/KafkaSource.cpp @@ -108,23 +108,20 @@ Chunk KafkaSource::generateImpl() size_t total_rows = 0; size_t failed_poll_attempts = 0; - auto on_error = [&](const MutableColumns & result_columns, Exception & e) + auto on_error = [&](const MutableColumns & result_columns, const ColumnCheckpoints & checkpoints, Exception & e) { ProfileEvents::increment(ProfileEvents::KafkaMessagesFailed); if (put_error_to_stream) { exception_message = e.message(); - for (const auto & column : result_columns) + for (size_t i = 0; i < result_columns.size(); ++i) { - // read_kafka_message could already push some rows to result_columns - // before exception, we need to fix it. - auto cur_rows = column->size(); - if (cur_rows > total_rows) - column->popBack(cur_rows - total_rows); + // We could already push some rows to result_columns before exception, we need to fix it. + result_columns[i]->rollback(*checkpoints[i]); // all data columns will get default value in case of error - column->insertDefault(); + result_columns[i]->insertDefault(); } return 1; diff --git a/src/Storages/Kafka/StorageKafka2.cpp b/src/Storages/Kafka/StorageKafka2.cpp index 3574b46e3b0..3275a38b55a 100644 --- a/src/Storages/Kafka/StorageKafka2.cpp +++ b/src/Storages/Kafka/StorageKafka2.cpp @@ -817,23 +817,20 @@ StorageKafka2::PolledBatchInfo StorageKafka2::pollConsumer( size_t total_rows = 0; size_t failed_poll_attempts = 0; - auto on_error = [&](const MutableColumns & result_columns, Exception & e) + auto on_error = [&](const MutableColumns & result_columns, const ColumnCheckpoints & checkpoints, Exception & e) { ProfileEvents::increment(ProfileEvents::KafkaMessagesFailed); if (put_error_to_stream) { exception_message = e.message(); - for (const auto & column : result_columns) + for (size_t i = 0; i < result_columns.size(); ++i) { - // read_kafka_message could already push some rows to result_columns - // before exception, we need to fix it. - auto cur_rows = column->size(); - if (cur_rows > total_rows) - column->popBack(cur_rows - total_rows); + // We could already push some rows to result_columns before exception, we need to fix it. + result_columns[i]->rollback(*checkpoints[i]); // all data columns will get default value in case of error - column->insertDefault(); + result_columns[i]->insertDefault(); } return 1; diff --git a/src/Storages/NATS/NATSSource.cpp b/src/Storages/NATS/NATSSource.cpp index 54f479faacc..bc15e9794cd 100644 --- a/src/Storages/NATS/NATSSource.cpp +++ b/src/Storages/NATS/NATSSource.cpp @@ -102,21 +102,18 @@ Chunk NATSSource::generate() storage.getFormatName(), empty_buf, non_virtual_header, context, max_block_size, std::nullopt, 1); std::optional exception_message; size_t total_rows = 0; - auto on_error = [&](const MutableColumns & result_columns, Exception & e) + auto on_error = [&](const MutableColumns & result_columns, const ColumnCheckpoints & checkpoints, Exception & e) { if (handle_error_mode == StreamingHandleErrorMode::STREAM) { exception_message = e.message(); - for (const auto & column : result_columns) + for (size_t i = 0; i < result_columns.size(); ++i) { - // We could already push some rows to result_columns - // before exception, we need to fix it. - auto cur_rows = column->size(); - if (cur_rows > total_rows) - column->popBack(cur_rows - total_rows); + // We could already push some rows to result_columns before exception, we need to fix it. + result_columns[i]->rollback(*checkpoints[i]); // All data columns will get default value in case of error. - column->insertDefault(); + result_columns[i]->insertDefault(); } return 1; diff --git a/src/Storages/RabbitMQ/RabbitMQSource.cpp b/src/Storages/RabbitMQ/RabbitMQSource.cpp index 15d013245d3..40e85cb06ed 100644 --- a/src/Storages/RabbitMQ/RabbitMQSource.cpp +++ b/src/Storages/RabbitMQ/RabbitMQSource.cpp @@ -161,21 +161,18 @@ Chunk RabbitMQSource::generateImpl() std::optional exception_message; size_t total_rows = 0; - auto on_error = [&](const MutableColumns & result_columns, Exception & e) + auto on_error = [&](const MutableColumns & result_columns, const ColumnCheckpoints & checkpoints, Exception & e) { if (handle_error_mode == StreamingHandleErrorMode::STREAM) { exception_message = e.message(); - for (const auto & column : result_columns) + for (size_t i = 0; i < result_columns.size(); ++i) { - // We could already push some rows to result_columns - // before exception, we need to fix it. - auto cur_rows = column->size(); - if (cur_rows > total_rows) - column->popBack(cur_rows - total_rows); + // We could already push some rows to result_columns before exception, we need to fix it. + result_columns[i]->rollback(*checkpoints[i]); // All data columns will get default value in case of error. - column->insertDefault(); + result_columns[i]->insertDefault(); } return 1; diff --git a/tests/queries/0_stateless/03230_async_insert_native.reference b/tests/queries/0_stateless/03230_async_insert_native.reference new file mode 100644 index 00000000000..e69de29bb2d diff --git a/tests/queries/0_stateless/03230_async_insert_native.sh b/tests/queries/0_stateless/03230_async_insert_native.sh new file mode 100755 index 00000000000..5ac3e40fa31 --- /dev/null +++ b/tests/queries/0_stateless/03230_async_insert_native.sh @@ -0,0 +1,23 @@ +#!/usr/bin/env bash + +CUR_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) +# shellcheck source=../shell_config.sh +. "$CUR_DIR"/../shell_config.sh + +${CLICKHOUSE_CLIENT} -q " + DROP TABLE IF EXISTS async_inserts_native; + CREATE TABLE async_inserts_native (m Map(UInt64, UInt64), v UInt64 MATERIALIZED m[4]) ENGINE = Memory; +" + +url="${CLICKHOUSE_URL}&async_insert=1&async_insert_busy_timeout_max_ms=1000&async_insert_busy_timeout_min_ms=1000&wait_for_async_insert=1" + +# This test runs inserts with memory_tracker_fault_probability > 0 to trigger memory limit during insertion. +# If rollback of columns is wrong in that case it may produce LOGICAL_ERROR and it will caught by termintation of server in debug mode. +for _ in {1..10}; do + ${CLICKHOUSE_CLIENT} -q "SELECT (range(number), range(number))::Map(UInt64, UInt64) AS m FROM numbers(1000) FORMAT Native" | \ + ${CLICKHOUSE_CURL} -sS -X POST "${url}&max_block_size=100&memory_tracker_fault_probability=0.01&query=INSERT+INTO+async_inserts_native+FORMAT+Native" --data-binary @- >/dev/null 2>&1 & +done + +wait + +${CLICKHOUSE_CLIENT} -q "DROP TABLE async_inserts_native;" diff --git a/tests/queries/0_stateless/03231_bson_tuple_array_map.reference b/tests/queries/0_stateless/03231_bson_tuple_array_map.reference new file mode 100644 index 00000000000..e69de29bb2d diff --git a/tests/queries/0_stateless/03231_bson_tuple_array_map.sh b/tests/queries/0_stateless/03231_bson_tuple_array_map.sh new file mode 100755 index 00000000000..600b15fb70a --- /dev/null +++ b/tests/queries/0_stateless/03231_bson_tuple_array_map.sh @@ -0,0 +1,18 @@ +#!/usr/bin/env bash + +CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) +# shellcheck source=../shell_config.sh +. "$CURDIR"/../shell_config.sh + +DATA_FILE=$CLICKHOUSE_TEST_UNIQUE_NAME.data + +$CLICKHOUSE_LOCAL -q "select tuple(1, x'00000000000000000000FFFF0000000000') as x format BSONEachRow" > $DATA_FILE +$CLICKHOUSE_LOCAL -q "select * from file('$DATA_FILE', BSONEachRow, 'x Tuple(UInt32, IPv6)') settings input_format_allow_errors_num=1" + +$CLICKHOUSE_LOCAL -q "select [x'00000000000000000000FFFF00000000', x'00000000000000000000FFFF0000000000'] as x format BSONEachRow" > $DATA_FILE +$CLICKHOUSE_LOCAL -q "select * from file('$DATA_FILE', BSONEachRow, 'x Array(IPv6)') settings input_format_allow_errors_num=1" + +$CLICKHOUSE_LOCAL -q "select map('key1', x'00000000000000000000FFFF00000000', 'key2', x'00000000000000000000FFFF0000000000') as x format BSONEachRow" > $DATA_FILE +$CLICKHOUSE_LOCAL -q "select * from file('$DATA_FILE', BSONEachRow, 'x Map(String, IPv6)') settings input_format_allow_errors_num=1" + +rm $DATA_FILE