mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-09-19 16:20:50 +00:00
Merge cef9eb80d9
into b94a7167a8
This commit is contained in:
commit
5050977d36
@ -369,6 +369,23 @@ void ColumnArray::popBack(size_t n)
|
|||||||
offsets_data.resize_assume_reserved(offsets_data.size() - n);
|
offsets_data.resize_assume_reserved(offsets_data.size() - n);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
ColumnCheckpointPtr ColumnArray::getCheckpoint() const
|
||||||
|
{
|
||||||
|
return std::make_shared<ColumnCheckpointWithNested>(size(), getData().getCheckpoint());
|
||||||
|
}
|
||||||
|
|
||||||
|
void ColumnArray::updateCheckpoint(ColumnCheckpoint & checkpoint) const
|
||||||
|
{
|
||||||
|
checkpoint.size = size();
|
||||||
|
getData().updateCheckpoint(*assert_cast<ColumnCheckpointWithNested &>(checkpoint).nested);
|
||||||
|
}
|
||||||
|
|
||||||
|
void ColumnArray::rollback(const ColumnCheckpoint & checkpoint)
|
||||||
|
{
|
||||||
|
getOffsets().resize_assume_reserved(checkpoint.size);
|
||||||
|
getData().rollback(*assert_cast<const ColumnCheckpointWithNested &>(checkpoint).nested);
|
||||||
|
}
|
||||||
|
|
||||||
int ColumnArray::compareAtImpl(size_t n, size_t m, const IColumn & rhs_, int nan_direction_hint, const Collator * collator) const
|
int ColumnArray::compareAtImpl(size_t n, size_t m, const IColumn & rhs_, int nan_direction_hint, const Collator * collator) const
|
||||||
{
|
{
|
||||||
const ColumnArray & rhs = assert_cast<const ColumnArray &>(rhs_);
|
const ColumnArray & rhs = assert_cast<const ColumnArray &>(rhs_);
|
||||||
|
@ -161,6 +161,10 @@ public:
|
|||||||
|
|
||||||
ColumnPtr compress() const override;
|
ColumnPtr compress() const override;
|
||||||
|
|
||||||
|
ColumnCheckpointPtr getCheckpoint() const override;
|
||||||
|
void updateCheckpoint(ColumnCheckpoint & checkpoint) const override;
|
||||||
|
void rollback(const ColumnCheckpoint & checkpoint) override;
|
||||||
|
|
||||||
void forEachSubcolumn(MutableColumnCallback callback) override
|
void forEachSubcolumn(MutableColumnCallback callback) override
|
||||||
{
|
{
|
||||||
callback(offsets);
|
callback(offsets);
|
||||||
|
@ -979,6 +979,92 @@ ColumnPtr ColumnDynamic::compress() const
|
|||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void ColumnDynamic::updateCheckpoint(ColumnCheckpoint & checkpoint) const
|
||||||
|
{
|
||||||
|
auto & nested = assert_cast<ColumnCheckpointWithMultipleNested &>(checkpoint).nested;
|
||||||
|
const auto & variants = variant_column_ptr->getVariants();
|
||||||
|
|
||||||
|
size_t old_size = nested.size();
|
||||||
|
chassert(old_size <= variants.size());
|
||||||
|
|
||||||
|
for (size_t i = 0; i < old_size; ++i)
|
||||||
|
{
|
||||||
|
variants[i]->updateCheckpoint(*nested[i]);
|
||||||
|
}
|
||||||
|
|
||||||
|
/// If column has new variants since last checkpoint create checkpoints for them.
|
||||||
|
if (old_size < variants.size())
|
||||||
|
{
|
||||||
|
nested.resize(variants.size());
|
||||||
|
for (size_t i = old_size; i < variants.size(); ++i)
|
||||||
|
nested[i] = variants[i]->getCheckpoint();
|
||||||
|
}
|
||||||
|
|
||||||
|
checkpoint.size = size();
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
DataTypePtr ColumnDynamic::popBackVariants(const VariantInfo & info, const std::vector<ColumnVariant::Discriminator> & local_to_global_discriminators, size_t n)
|
||||||
|
{
|
||||||
|
const auto & type_variant = assert_cast<const DataTypeVariant &>(*info.variant_type);
|
||||||
|
|
||||||
|
std::unordered_map<ColumnVariant::Discriminator, String> discriminator_to_name;
|
||||||
|
std::unordered_map<String, DataTypePtr> name_to_data_type;
|
||||||
|
|
||||||
|
for (const auto & [name, discriminator] : info.variant_name_to_discriminator)
|
||||||
|
discriminator_to_name.emplace(discriminator, name);
|
||||||
|
|
||||||
|
for (const auto & type : type_variant.getVariants())
|
||||||
|
name_to_data_type.emplace(type->getName(), type);
|
||||||
|
|
||||||
|
/// Remove last n variants according to global discriminators.
|
||||||
|
/// This code relies on invariant that new variants are always added to the end in ColumnVariant.
|
||||||
|
for (auto it = local_to_global_discriminators.rbegin(); it < local_to_global_discriminators.rbegin() + n; ++it)
|
||||||
|
discriminator_to_name.erase(*it);
|
||||||
|
|
||||||
|
DataTypes new_variants;
|
||||||
|
for (const auto & [d, name] : discriminator_to_name)
|
||||||
|
new_variants.push_back(name_to_data_type.at(name));
|
||||||
|
|
||||||
|
return std::make_shared<DataTypeVariant>(std::move(new_variants));
|
||||||
|
}
|
||||||
|
|
||||||
|
void ColumnDynamic::rollback(const ColumnCheckpoint & checkpoint)
|
||||||
|
{
|
||||||
|
const auto & nested = assert_cast<const ColumnCheckpointWithMultipleNested &>(checkpoint).nested;
|
||||||
|
chassert(nested.size() <= variant_column_ptr->getNumVariants());
|
||||||
|
|
||||||
|
/// The structure hasn't changed, so we can use generic rollback of Variant column
|
||||||
|
if (nested.size() == variant_column_ptr->getNumVariants())
|
||||||
|
{
|
||||||
|
variant_column_ptr->rollback(checkpoint);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
auto new_subcolumns = variant_column_ptr->getVariants();
|
||||||
|
auto new_discriminators_map = variant_column_ptr->getLocalToGlobalDiscriminatorsMapping();
|
||||||
|
auto new_discriminators_column = variant_column_ptr->getLocalDiscriminatorsPtr();
|
||||||
|
auto new_offses_column = variant_column_ptr->getOffsetsPtr();
|
||||||
|
|
||||||
|
/// Remove new variants that were added since last checkpoint.
|
||||||
|
auto new_variant_type = popBackVariants(variant_info, new_discriminators_map, variant_column_ptr->getNumVariants() - nested.size());
|
||||||
|
createVariantInfo(new_variant_type);
|
||||||
|
variant_mappings_cache.clear();
|
||||||
|
|
||||||
|
new_subcolumns.resize(nested.size());
|
||||||
|
new_discriminators_map.resize(nested.size());
|
||||||
|
|
||||||
|
/// Manually rollback internals of Variant column
|
||||||
|
new_discriminators_column->assumeMutable()->popBack(new_discriminators_column->size() - checkpoint.size);
|
||||||
|
new_offses_column->assumeMutable()->popBack(new_offses_column->size() - checkpoint.size);
|
||||||
|
|
||||||
|
for (size_t i = 0; i < nested.size(); ++i)
|
||||||
|
new_subcolumns[i]->rollback(*nested[i]);
|
||||||
|
|
||||||
|
variant_column = ColumnVariant::create(new_discriminators_column, new_offses_column, Columns(new_subcolumns.begin(), new_subcolumns.end()), new_discriminators_map);
|
||||||
|
variant_column_ptr = assert_cast<ColumnVariant *>(variant_column.get());
|
||||||
|
}
|
||||||
|
|
||||||
String ColumnDynamic::getTypeNameAt(size_t row_num) const
|
String ColumnDynamic::getTypeNameAt(size_t row_num) const
|
||||||
{
|
{
|
||||||
const auto & variant_col = getVariantColumn();
|
const auto & variant_col = getVariantColumn();
|
||||||
|
@ -304,6 +304,15 @@ public:
|
|||||||
variant_column_ptr->protect();
|
variant_column_ptr->protect();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
ColumnCheckpointPtr getCheckpoint() const override
|
||||||
|
{
|
||||||
|
return variant_column_ptr->getCheckpoint();
|
||||||
|
}
|
||||||
|
|
||||||
|
void updateCheckpoint(ColumnCheckpoint & checkpoint) const override;
|
||||||
|
|
||||||
|
void rollback(const ColumnCheckpoint & checkpoint) override;
|
||||||
|
|
||||||
void forEachSubcolumn(MutableColumnCallback callback) override
|
void forEachSubcolumn(MutableColumnCallback callback) override
|
||||||
{
|
{
|
||||||
callback(variant_column);
|
callback(variant_column);
|
||||||
@ -444,6 +453,8 @@ private:
|
|||||||
|
|
||||||
void updateVariantInfoAndExpandVariantColumn(const DataTypePtr & new_variant_type);
|
void updateVariantInfoAndExpandVariantColumn(const DataTypePtr & new_variant_type);
|
||||||
|
|
||||||
|
static DataTypePtr popBackVariants(const VariantInfo & info, const std::vector<ColumnVariant::Discriminator> & local_to_global_discriminators, size_t n);
|
||||||
|
|
||||||
WrappedPtr variant_column;
|
WrappedPtr variant_column;
|
||||||
/// Store and use pointer to ColumnVariant to avoid virtual calls.
|
/// Store and use pointer to ColumnVariant to avoid virtual calls.
|
||||||
/// ColumnDynamic is widely used inside ColumnObject for each path and
|
/// ColumnDynamic is widely used inside ColumnObject for each path and
|
||||||
|
@ -312,6 +312,21 @@ void ColumnMap::getExtremes(Field & min, Field & max) const
|
|||||||
max = std::move(map_max_value);
|
max = std::move(map_max_value);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
ColumnCheckpointPtr ColumnMap::getCheckpoint() const
|
||||||
|
{
|
||||||
|
return nested->getCheckpoint();
|
||||||
|
}
|
||||||
|
|
||||||
|
void ColumnMap::updateCheckpoint(ColumnCheckpoint & checkpoint) const
|
||||||
|
{
|
||||||
|
nested->updateCheckpoint(checkpoint);
|
||||||
|
}
|
||||||
|
|
||||||
|
void ColumnMap::rollback(const ColumnCheckpoint & checkpoint)
|
||||||
|
{
|
||||||
|
nested->rollback(checkpoint);
|
||||||
|
}
|
||||||
|
|
||||||
void ColumnMap::forEachSubcolumn(MutableColumnCallback callback)
|
void ColumnMap::forEachSubcolumn(MutableColumnCallback callback)
|
||||||
{
|
{
|
||||||
callback(nested);
|
callback(nested);
|
||||||
|
@ -102,6 +102,9 @@ public:
|
|||||||
size_t byteSizeAt(size_t n) const override;
|
size_t byteSizeAt(size_t n) const override;
|
||||||
size_t allocatedBytes() const override;
|
size_t allocatedBytes() const override;
|
||||||
void protect() override;
|
void protect() override;
|
||||||
|
ColumnCheckpointPtr getCheckpoint() const override;
|
||||||
|
void updateCheckpoint(ColumnCheckpoint & checkpoint) const override;
|
||||||
|
void rollback(const ColumnCheckpoint & checkpoint) override;
|
||||||
void forEachSubcolumn(MutableColumnCallback callback) override;
|
void forEachSubcolumn(MutableColumnCallback callback) override;
|
||||||
void forEachSubcolumnRecursively(RecursiveMutableColumnCallback callback) override;
|
void forEachSubcolumnRecursively(RecursiveMutableColumnCallback callback) override;
|
||||||
bool structureEquals(const IColumn & rhs) const override;
|
bool structureEquals(const IColumn & rhs) const override;
|
||||||
|
@ -305,6 +305,23 @@ void ColumnNullable::popBack(size_t n)
|
|||||||
getNullMapColumn().popBack(n);
|
getNullMapColumn().popBack(n);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
ColumnCheckpointPtr ColumnNullable::getCheckpoint() const
|
||||||
|
{
|
||||||
|
return std::make_shared<ColumnCheckpointWithNested>(size(), nested_column->getCheckpoint());
|
||||||
|
}
|
||||||
|
|
||||||
|
void ColumnNullable::updateCheckpoint(ColumnCheckpoint & checkpoint) const
|
||||||
|
{
|
||||||
|
checkpoint.size = size();
|
||||||
|
nested_column->updateCheckpoint(*assert_cast<ColumnCheckpointWithNested &>(checkpoint).nested);
|
||||||
|
}
|
||||||
|
|
||||||
|
void ColumnNullable::rollback(const ColumnCheckpoint & checkpoint)
|
||||||
|
{
|
||||||
|
getNullMapData().resize_assume_reserved(checkpoint.size);
|
||||||
|
nested_column->rollback(*assert_cast<const ColumnCheckpointWithNested &>(checkpoint).nested);
|
||||||
|
}
|
||||||
|
|
||||||
ColumnPtr ColumnNullable::filter(const Filter & filt, ssize_t result_size_hint) const
|
ColumnPtr ColumnNullable::filter(const Filter & filt, ssize_t result_size_hint) const
|
||||||
{
|
{
|
||||||
ColumnPtr filtered_data = getNestedColumn().filter(filt, result_size_hint);
|
ColumnPtr filtered_data = getNestedColumn().filter(filt, result_size_hint);
|
||||||
|
@ -143,6 +143,10 @@ public:
|
|||||||
|
|
||||||
ColumnPtr compress() const override;
|
ColumnPtr compress() const override;
|
||||||
|
|
||||||
|
ColumnCheckpointPtr getCheckpoint() const override;
|
||||||
|
void updateCheckpoint(ColumnCheckpoint & checkpoint) const override;
|
||||||
|
void rollback(const ColumnCheckpoint & checkpoint) override;
|
||||||
|
|
||||||
void forEachSubcolumn(MutableColumnCallback callback) override
|
void forEachSubcolumn(MutableColumnCallback callback) override
|
||||||
{
|
{
|
||||||
callback(nested_column);
|
callback(nested_column);
|
||||||
|
@ -30,6 +30,23 @@ const std::shared_ptr<SerializationDynamic> & getDynamicSerialization()
|
|||||||
return dynamic_serialization;
|
return dynamic_serialization;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
struct ColumnObjectCheckpoint : public ColumnCheckpoint
|
||||||
|
{
|
||||||
|
using CheckpointsMap = std::unordered_map<String, ColumnCheckpointPtr>;
|
||||||
|
|
||||||
|
ColumnObjectCheckpoint(size_t size_, CheckpointsMap typed_paths_, CheckpointsMap dynamic_paths_, ColumnCheckpointPtr shared_data_)
|
||||||
|
: ColumnCheckpoint(size_)
|
||||||
|
, typed_paths(std::move(typed_paths_))
|
||||||
|
, dynamic_paths(std::move(dynamic_paths_))
|
||||||
|
, shared_data(std::move(shared_data_))
|
||||||
|
{
|
||||||
|
}
|
||||||
|
|
||||||
|
CheckpointsMap typed_paths;
|
||||||
|
CheckpointsMap dynamic_paths;
|
||||||
|
ColumnCheckpointPtr shared_data;
|
||||||
|
};
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
ColumnObject::ColumnObject(
|
ColumnObject::ColumnObject(
|
||||||
@ -655,6 +672,69 @@ void ColumnObject::popBack(size_t n)
|
|||||||
shared_data->popBack(n);
|
shared_data->popBack(n);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
ColumnCheckpointPtr ColumnObject::getCheckpoint() const
|
||||||
|
{
|
||||||
|
auto get_checkpoints = [](const auto & columns)
|
||||||
|
{
|
||||||
|
std::unordered_map<String, ColumnCheckpointPtr> checkpoints;
|
||||||
|
for (const auto & [name, column] : columns)
|
||||||
|
checkpoints[name] = column->getCheckpoint();
|
||||||
|
|
||||||
|
return checkpoints;
|
||||||
|
};
|
||||||
|
|
||||||
|
return std::make_shared<ColumnObjectCheckpoint>(size(), get_checkpoints(typed_paths), get_checkpoints(dynamic_paths_ptrs), shared_data->getCheckpoint());
|
||||||
|
}
|
||||||
|
|
||||||
|
void ColumnObject::updateCheckpoint(ColumnCheckpoint & checkpoint) const
|
||||||
|
{
|
||||||
|
auto & object_checkpoint = assert_cast<ColumnObjectCheckpoint &>(checkpoint);
|
||||||
|
|
||||||
|
auto update_checkpoints = [&](const auto & columns_map, auto & checkpoints_map)
|
||||||
|
{
|
||||||
|
for (const auto & [name, column] : columns_map)
|
||||||
|
{
|
||||||
|
auto & nested = checkpoints_map[name];
|
||||||
|
if (!nested)
|
||||||
|
nested = column->getCheckpoint();
|
||||||
|
else
|
||||||
|
column->updateCheckpoint(*nested);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
checkpoint.size = size();
|
||||||
|
update_checkpoints(typed_paths, object_checkpoint.typed_paths);
|
||||||
|
update_checkpoints(dynamic_paths, object_checkpoint.dynamic_paths);
|
||||||
|
shared_data->updateCheckpoint(*object_checkpoint.shared_data);
|
||||||
|
}
|
||||||
|
|
||||||
|
void ColumnObject::rollback(const ColumnCheckpoint & checkpoint)
|
||||||
|
{
|
||||||
|
const auto & object_checkpoint = assert_cast<const ColumnObjectCheckpoint &>(checkpoint);
|
||||||
|
|
||||||
|
auto rollback_columns = [&](auto & columns_map, const auto & checkpoints_map)
|
||||||
|
{
|
||||||
|
NameSet names_to_remove;
|
||||||
|
|
||||||
|
/// Rollback subcolumns and remove paths that were not in checkpoint.
|
||||||
|
for (auto & [name, column] : columns_map)
|
||||||
|
{
|
||||||
|
auto it = checkpoints_map.find(name);
|
||||||
|
if (it == checkpoints_map.end())
|
||||||
|
names_to_remove.insert(name);
|
||||||
|
else
|
||||||
|
column->rollback(*it->second);
|
||||||
|
}
|
||||||
|
|
||||||
|
for (const auto & name : names_to_remove)
|
||||||
|
columns_map.erase(name);
|
||||||
|
};
|
||||||
|
|
||||||
|
rollback_columns(typed_paths, object_checkpoint.typed_paths);
|
||||||
|
rollback_columns(dynamic_paths, object_checkpoint.dynamic_paths);
|
||||||
|
shared_data->rollback(*object_checkpoint.shared_data);
|
||||||
|
}
|
||||||
|
|
||||||
StringRef ColumnObject::serializeValueIntoArena(size_t n, Arena & arena, const char *& begin) const
|
StringRef ColumnObject::serializeValueIntoArena(size_t n, Arena & arena, const char *& begin) const
|
||||||
{
|
{
|
||||||
StringRef res(begin, 0);
|
StringRef res(begin, 0);
|
||||||
|
@ -159,6 +159,9 @@ public:
|
|||||||
size_t byteSizeAt(size_t n) const override;
|
size_t byteSizeAt(size_t n) const override;
|
||||||
size_t allocatedBytes() const override;
|
size_t allocatedBytes() const override;
|
||||||
void protect() override;
|
void protect() override;
|
||||||
|
ColumnCheckpointPtr getCheckpoint() const override;
|
||||||
|
void updateCheckpoint(ColumnCheckpoint & checkpoint) const override;
|
||||||
|
void rollback(const ColumnCheckpoint & checkpoint) override;
|
||||||
|
|
||||||
void forEachSubcolumn(MutableColumnCallback callback) override;
|
void forEachSubcolumn(MutableColumnCallback callback) override;
|
||||||
|
|
||||||
|
@ -308,6 +308,28 @@ void ColumnSparse::popBack(size_t n)
|
|||||||
_size = new_size;
|
_size = new_size;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
ColumnCheckpointPtr ColumnSparse::getCheckpoint() const
|
||||||
|
{
|
||||||
|
return std::make_shared<ColumnCheckpointWithNested>(size(), values->getCheckpoint());
|
||||||
|
}
|
||||||
|
|
||||||
|
void ColumnSparse::updateCheckpoint(ColumnCheckpoint & checkpoint) const
|
||||||
|
{
|
||||||
|
checkpoint.size = size();
|
||||||
|
values->updateCheckpoint(*assert_cast<ColumnCheckpointWithNested &>(checkpoint).nested);
|
||||||
|
}
|
||||||
|
|
||||||
|
void ColumnSparse::rollback(const ColumnCheckpoint & checkpoint)
|
||||||
|
{
|
||||||
|
_size = checkpoint.size;
|
||||||
|
|
||||||
|
const auto & nested = *assert_cast<const ColumnCheckpointWithNested &>(checkpoint).nested;
|
||||||
|
chassert(nested.size > 0);
|
||||||
|
|
||||||
|
values->rollback(nested);
|
||||||
|
getOffsetsData().resize_assume_reserved(nested.size - 1);
|
||||||
|
}
|
||||||
|
|
||||||
ColumnPtr ColumnSparse::filter(const Filter & filt, ssize_t) const
|
ColumnPtr ColumnSparse::filter(const Filter & filt, ssize_t) const
|
||||||
{
|
{
|
||||||
if (_size != filt.size())
|
if (_size != filt.size())
|
||||||
|
@ -149,6 +149,10 @@ public:
|
|||||||
|
|
||||||
ColumnPtr compress() const override;
|
ColumnPtr compress() const override;
|
||||||
|
|
||||||
|
ColumnCheckpointPtr getCheckpoint() const override;
|
||||||
|
void updateCheckpoint(ColumnCheckpoint & checkpoint) const override;
|
||||||
|
void rollback(const ColumnCheckpoint & checkpoint) override;
|
||||||
|
|
||||||
void forEachSubcolumn(MutableColumnCallback callback) override;
|
void forEachSubcolumn(MutableColumnCallback callback) override;
|
||||||
void forEachSubcolumnRecursively(RecursiveMutableColumnCallback callback) override;
|
void forEachSubcolumnRecursively(RecursiveMutableColumnCallback callback) override;
|
||||||
|
|
||||||
|
@ -240,6 +240,23 @@ ColumnPtr ColumnString::permute(const Permutation & perm, size_t limit) const
|
|||||||
return permuteImpl(*this, perm, limit);
|
return permuteImpl(*this, perm, limit);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
ColumnCheckpointPtr ColumnString::getCheckpoint() const
|
||||||
|
{
|
||||||
|
auto nested = std::make_shared<ColumnCheckpoint>(chars.size());
|
||||||
|
return std::make_shared<ColumnCheckpointWithNested>(size(), std::move(nested));
|
||||||
|
}
|
||||||
|
|
||||||
|
void ColumnString::updateCheckpoint(ColumnCheckpoint & checkpoint) const
|
||||||
|
{
|
||||||
|
checkpoint.size = size();
|
||||||
|
assert_cast<ColumnCheckpointWithNested &>(checkpoint).nested->size = chars.size();
|
||||||
|
}
|
||||||
|
|
||||||
|
void ColumnString::rollback(const ColumnCheckpoint & checkpoint)
|
||||||
|
{
|
||||||
|
offsets.resize_assume_reserved(checkpoint.size);
|
||||||
|
chars.resize_assume_reserved(assert_cast<const ColumnCheckpointWithNested &>(checkpoint).nested->size);
|
||||||
|
}
|
||||||
|
|
||||||
void ColumnString::collectSerializedValueSizes(PaddedPODArray<UInt64> & sizes, const UInt8 * is_null) const
|
void ColumnString::collectSerializedValueSizes(PaddedPODArray<UInt64> & sizes, const UInt8 * is_null) const
|
||||||
{
|
{
|
||||||
|
@ -194,6 +194,10 @@ public:
|
|||||||
offsets.resize_assume_reserved(offsets.size() - n);
|
offsets.resize_assume_reserved(offsets.size() - n);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
ColumnCheckpointPtr getCheckpoint() const override;
|
||||||
|
void updateCheckpoint(ColumnCheckpoint & checkpoint) const override;
|
||||||
|
void rollback(const ColumnCheckpoint & checkpoint) override;
|
||||||
|
|
||||||
void collectSerializedValueSizes(PaddedPODArray<UInt64> & sizes, const UInt8 * is_null) const override;
|
void collectSerializedValueSizes(PaddedPODArray<UInt64> & sizes, const UInt8 * is_null) const override;
|
||||||
|
|
||||||
StringRef serializeValueIntoArena(size_t n, Arena & arena, char const *& begin) const override;
|
StringRef serializeValueIntoArena(size_t n, Arena & arena, char const *& begin) const override;
|
||||||
|
@ -254,6 +254,37 @@ void ColumnTuple::popBack(size_t n)
|
|||||||
column->popBack(n);
|
column->popBack(n);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
ColumnCheckpointPtr ColumnTuple::getCheckpoint() const
|
||||||
|
{
|
||||||
|
ColumnCheckpoints checkpoints;
|
||||||
|
checkpoints.reserve(columns.size());
|
||||||
|
|
||||||
|
for (const auto & column : columns)
|
||||||
|
checkpoints.push_back(column->getCheckpoint());
|
||||||
|
|
||||||
|
return std::make_shared<ColumnCheckpointWithMultipleNested>(size(), std::move(checkpoints));
|
||||||
|
}
|
||||||
|
|
||||||
|
void ColumnTuple::updateCheckpoint(ColumnCheckpoint & checkpoint) const
|
||||||
|
{
|
||||||
|
auto & checkpoints = assert_cast<ColumnCheckpointWithMultipleNested &>(checkpoint).nested;
|
||||||
|
chassert(checkpoints.size() == columns.size());
|
||||||
|
|
||||||
|
checkpoint.size = size();
|
||||||
|
for (size_t i = 0; i < columns.size(); ++i)
|
||||||
|
columns[i]->updateCheckpoint(*checkpoints[i]);
|
||||||
|
}
|
||||||
|
|
||||||
|
void ColumnTuple::rollback(const ColumnCheckpoint & checkpoint)
|
||||||
|
{
|
||||||
|
column_length = checkpoint.size;
|
||||||
|
const auto & checkpoints = assert_cast<const ColumnCheckpointWithMultipleNested &>(checkpoint).nested;
|
||||||
|
|
||||||
|
chassert(columns.size() == checkpoints.size());
|
||||||
|
for (size_t i = 0; i < columns.size(); ++i)
|
||||||
|
columns[i]->rollback(*checkpoints[i]);
|
||||||
|
}
|
||||||
|
|
||||||
StringRef ColumnTuple::serializeValueIntoArena(size_t n, Arena & arena, char const *& begin) const
|
StringRef ColumnTuple::serializeValueIntoArena(size_t n, Arena & arena, char const *& begin) const
|
||||||
{
|
{
|
||||||
if (columns.empty())
|
if (columns.empty())
|
||||||
|
@ -118,6 +118,9 @@ public:
|
|||||||
size_t byteSizeAt(size_t n) const override;
|
size_t byteSizeAt(size_t n) const override;
|
||||||
size_t allocatedBytes() const override;
|
size_t allocatedBytes() const override;
|
||||||
void protect() override;
|
void protect() override;
|
||||||
|
ColumnCheckpointPtr getCheckpoint() const override;
|
||||||
|
void updateCheckpoint(ColumnCheckpoint & checkpoint) const override;
|
||||||
|
void rollback(const ColumnCheckpoint & checkpoint) override;
|
||||||
void forEachSubcolumn(MutableColumnCallback callback) override;
|
void forEachSubcolumn(MutableColumnCallback callback) override;
|
||||||
void forEachSubcolumnRecursively(RecursiveMutableColumnCallback callback) override;
|
void forEachSubcolumnRecursively(RecursiveMutableColumnCallback callback) override;
|
||||||
bool structureEquals(const IColumn & rhs) const override;
|
bool structureEquals(const IColumn & rhs) const override;
|
||||||
|
@ -739,6 +739,39 @@ void ColumnVariant::popBack(size_t n)
|
|||||||
offsets->popBack(n);
|
offsets->popBack(n);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
ColumnCheckpointPtr ColumnVariant::getCheckpoint() const
|
||||||
|
{
|
||||||
|
ColumnCheckpoints checkpoints;
|
||||||
|
checkpoints.reserve(variants.size());
|
||||||
|
|
||||||
|
for (const auto & column : variants)
|
||||||
|
checkpoints.push_back(column->getCheckpoint());
|
||||||
|
|
||||||
|
return std::make_shared<ColumnCheckpointWithMultipleNested>(size(), std::move(checkpoints));
|
||||||
|
}
|
||||||
|
|
||||||
|
void ColumnVariant::updateCheckpoint(ColumnCheckpoint & checkpoint) const
|
||||||
|
{
|
||||||
|
auto & checkpoints = assert_cast<ColumnCheckpointWithMultipleNested &>(checkpoint).nested;
|
||||||
|
chassert(checkpoints.size() == variants.size());
|
||||||
|
|
||||||
|
checkpoint.size = size();
|
||||||
|
for (size_t i = 0; i < variants.size(); ++i)
|
||||||
|
variants[i]->updateCheckpoint(*checkpoints[i]);
|
||||||
|
}
|
||||||
|
|
||||||
|
void ColumnVariant::rollback(const ColumnCheckpoint & checkpoint)
|
||||||
|
{
|
||||||
|
getOffsets().resize_assume_reserved(checkpoint.size);
|
||||||
|
getLocalDiscriminators().resize_assume_reserved(checkpoint.size);
|
||||||
|
|
||||||
|
const auto & checkpoints = assert_cast<const ColumnCheckpointWithMultipleNested &>(checkpoint).nested;
|
||||||
|
chassert(variants.size() == checkpoints.size());
|
||||||
|
|
||||||
|
for (size_t i = 0; i < variants.size(); ++i)
|
||||||
|
variants[i]->rollback(*checkpoints[i]);
|
||||||
|
}
|
||||||
|
|
||||||
StringRef ColumnVariant::serializeValueIntoArena(size_t n, Arena & arena, char const *& begin) const
|
StringRef ColumnVariant::serializeValueIntoArena(size_t n, Arena & arena, char const *& begin) const
|
||||||
{
|
{
|
||||||
/// During any serialization/deserialization we should always use global discriminators.
|
/// During any serialization/deserialization we should always use global discriminators.
|
||||||
|
@ -248,6 +248,9 @@ public:
|
|||||||
size_t byteSizeAt(size_t n) const override;
|
size_t byteSizeAt(size_t n) const override;
|
||||||
size_t allocatedBytes() const override;
|
size_t allocatedBytes() const override;
|
||||||
void protect() override;
|
void protect() override;
|
||||||
|
ColumnCheckpointPtr getCheckpoint() const override;
|
||||||
|
void updateCheckpoint(ColumnCheckpoint & checkpoint) const override;
|
||||||
|
void rollback(const ColumnCheckpoint & checkpoint) override;
|
||||||
void forEachSubcolumn(MutableColumnCallback callback) override;
|
void forEachSubcolumn(MutableColumnCallback callback) override;
|
||||||
void forEachSubcolumnRecursively(RecursiveMutableColumnCallback callback) override;
|
void forEachSubcolumnRecursively(RecursiveMutableColumnCallback callback) override;
|
||||||
bool structureEquals(const IColumn & rhs) const override;
|
bool structureEquals(const IColumn & rhs) const override;
|
||||||
|
@ -49,6 +49,40 @@ struct EqualRange
|
|||||||
|
|
||||||
using EqualRanges = std::vector<EqualRange>;
|
using EqualRanges = std::vector<EqualRange>;
|
||||||
|
|
||||||
|
/// A checkpoint that contains size of column and all its subcolumns.
|
||||||
|
/// It can be used to rollback column to the previous state, for example
|
||||||
|
/// after failed parsing when column may be in inconsistent state.
|
||||||
|
struct ColumnCheckpoint
|
||||||
|
{
|
||||||
|
size_t size;
|
||||||
|
|
||||||
|
explicit ColumnCheckpoint(size_t size_) : size(size_) {}
|
||||||
|
virtual ~ColumnCheckpoint() = default;
|
||||||
|
};
|
||||||
|
|
||||||
|
using ColumnCheckpointPtr = std::shared_ptr<ColumnCheckpoint>;
|
||||||
|
using ColumnCheckpoints = std::vector<ColumnCheckpointPtr>;
|
||||||
|
|
||||||
|
struct ColumnCheckpointWithNested : public ColumnCheckpoint
|
||||||
|
{
|
||||||
|
ColumnCheckpointWithNested(size_t size_, ColumnCheckpointPtr nested_)
|
||||||
|
: ColumnCheckpoint(size_), nested(std::move(nested_))
|
||||||
|
{
|
||||||
|
}
|
||||||
|
|
||||||
|
ColumnCheckpointPtr nested;
|
||||||
|
};
|
||||||
|
|
||||||
|
struct ColumnCheckpointWithMultipleNested : public ColumnCheckpoint
|
||||||
|
{
|
||||||
|
ColumnCheckpointWithMultipleNested(size_t size_, ColumnCheckpoints nested_)
|
||||||
|
: ColumnCheckpoint(size_), nested(std::move(nested_))
|
||||||
|
{
|
||||||
|
}
|
||||||
|
|
||||||
|
ColumnCheckpoints nested;
|
||||||
|
};
|
||||||
|
|
||||||
/// Declares interface to store columns in memory.
|
/// Declares interface to store columns in memory.
|
||||||
class IColumn : public COW<IColumn>
|
class IColumn : public COW<IColumn>
|
||||||
{
|
{
|
||||||
@ -509,6 +543,17 @@ public:
|
|||||||
/// The operation is slow and performed only for debug builds.
|
/// The operation is slow and performed only for debug builds.
|
||||||
virtual void protect() {}
|
virtual void protect() {}
|
||||||
|
|
||||||
|
/// Returns checkpoint of current state of column.
|
||||||
|
virtual ColumnCheckpointPtr getCheckpoint() const { return std::make_shared<ColumnCheckpoint>(size()); }
|
||||||
|
|
||||||
|
/// Updates the checkpoint with current state. It is used to avoid extra allocations in 'getCheckpoint'.
|
||||||
|
virtual void updateCheckpoint(ColumnCheckpoint & checkpoint) const { checkpoint.size = size(); }
|
||||||
|
|
||||||
|
/// Rollbacks column to the checkpoint.
|
||||||
|
/// Unlike 'popBack' this method should work correctly even if column has invalid state.
|
||||||
|
/// Sizes of columns in checkpoint must be less or equal than current size.
|
||||||
|
virtual void rollback(const ColumnCheckpoint & checkpoint) { popBack(size() - checkpoint.size); }
|
||||||
|
|
||||||
/// If the column contains subcolumns (such as Array, Nullable, etc), do callback on them.
|
/// If the column contains subcolumns (such as Array, Nullable, etc), do callback on them.
|
||||||
/// Shallow: doesn't do recursive calls; don't do call for itself.
|
/// Shallow: doesn't do recursive calls; don't do call for itself.
|
||||||
|
|
||||||
|
@ -920,3 +920,72 @@ TEST(ColumnDynamic, compare)
|
|||||||
ASSERT_EQ(column_from->compareAt(3, 2, *column_from, -1), -1);
|
ASSERT_EQ(column_from->compareAt(3, 2, *column_from, -1), -1);
|
||||||
ASSERT_EQ(column_from->compareAt(3, 4, *column_from, -1), -1);
|
ASSERT_EQ(column_from->compareAt(3, 4, *column_from, -1), -1);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
TEST(ColumnDynamic, rollback)
|
||||||
|
{
|
||||||
|
auto check_variant = [](const ColumnVariant & column_variant, std::vector<size_t> sizes)
|
||||||
|
{
|
||||||
|
ASSERT_EQ(column_variant.getNumVariants(), sizes.size());
|
||||||
|
size_t num_rows = 0;
|
||||||
|
|
||||||
|
for (size_t i = 0; i < sizes.size(); ++i)
|
||||||
|
{
|
||||||
|
ASSERT_EQ(column_variant.getVariants()[i]->size(), sizes[i]);
|
||||||
|
num_rows += sizes[i];
|
||||||
|
}
|
||||||
|
|
||||||
|
ASSERT_EQ(num_rows, column_variant.size());
|
||||||
|
};
|
||||||
|
|
||||||
|
auto check_checkpoint = [&](const ColumnCheckpoint & cp, std::vector<size_t> sizes)
|
||||||
|
{
|
||||||
|
const auto & nested = assert_cast<const ColumnCheckpointWithMultipleNested &>(cp).nested;
|
||||||
|
ASSERT_EQ(nested.size(), sizes.size());
|
||||||
|
size_t num_rows = 0;
|
||||||
|
|
||||||
|
for (size_t i = 0; i < sizes.size(); ++i)
|
||||||
|
{
|
||||||
|
ASSERT_EQ(nested[i]->size, sizes[i]);
|
||||||
|
num_rows += sizes[i];
|
||||||
|
}
|
||||||
|
|
||||||
|
ASSERT_EQ(num_rows, cp.size);
|
||||||
|
};
|
||||||
|
|
||||||
|
std::vector<std::pair<ColumnCheckpointPtr, std::vector<size_t>>> checkpoints;
|
||||||
|
|
||||||
|
auto column = ColumnDynamic::create(2);
|
||||||
|
auto checkpoint = column->getCheckpoint();
|
||||||
|
|
||||||
|
column->insert(Field(42));
|
||||||
|
|
||||||
|
column->updateCheckpoint(*checkpoint);
|
||||||
|
checkpoints.emplace_back(checkpoint, std::vector<size_t>{0, 1});
|
||||||
|
|
||||||
|
column->insert(Field("str1"));
|
||||||
|
column->rollback(*checkpoint);
|
||||||
|
|
||||||
|
check_checkpoint(*checkpoint, checkpoints.back().second);
|
||||||
|
check_variant(column->getVariantColumn(), checkpoints.back().second);
|
||||||
|
|
||||||
|
column->insert("str1");
|
||||||
|
checkpoints.emplace_back(column->getCheckpoint(), std::vector<size_t>{0, 1, 1});
|
||||||
|
|
||||||
|
column->insert("str2");
|
||||||
|
checkpoints.emplace_back(column->getCheckpoint(), std::vector<size_t>{0, 1, 2});
|
||||||
|
|
||||||
|
column->insert(Array({1, 2}));
|
||||||
|
checkpoints.emplace_back(column->getCheckpoint(), std::vector<size_t>{1, 1, 2});
|
||||||
|
|
||||||
|
column->insert(Field(42.42));
|
||||||
|
checkpoints.emplace_back(column->getCheckpoint(), std::vector<size_t>{2, 1, 2});
|
||||||
|
|
||||||
|
for (const auto & [cp, sizes] : checkpoints)
|
||||||
|
{
|
||||||
|
auto column_copy = column->clone();
|
||||||
|
column_copy->rollback(*cp);
|
||||||
|
|
||||||
|
check_checkpoint(*cp, sizes);
|
||||||
|
check_variant(assert_cast<const ColumnDynamic &>(*column_copy).getVariantColumn(), sizes);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
@ -5,6 +5,7 @@
|
|||||||
#include <IO/WriteBufferFromString.h>
|
#include <IO/WriteBufferFromString.h>
|
||||||
|
|
||||||
#include <Common/Arena.h>
|
#include <Common/Arena.h>
|
||||||
|
#include "Core/Field.h"
|
||||||
#include <gtest/gtest.h>
|
#include <gtest/gtest.h>
|
||||||
|
|
||||||
using namespace DB;
|
using namespace DB;
|
||||||
@ -349,3 +350,65 @@ TEST(ColumnObject, SkipSerializedInArena)
|
|||||||
pos = col2->skipSerializedInArena(pos);
|
pos = col2->skipSerializedInArena(pos);
|
||||||
ASSERT_EQ(pos, end);
|
ASSERT_EQ(pos, end);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
TEST(ColumnObject, rollback)
|
||||||
|
{
|
||||||
|
auto type = DataTypeFactory::instance().get("JSON(max_dynamic_types=10, max_dynamic_paths=2, a.a UInt32, a.b UInt32)");
|
||||||
|
auto col = type->createColumn();
|
||||||
|
auto & col_object = assert_cast<ColumnObject &>(*col);
|
||||||
|
const auto & typed_paths = col_object.getTypedPaths();
|
||||||
|
const auto & dynamic_paths = col_object.getDynamicPaths();
|
||||||
|
const auto & shared_data = col_object.getSharedDataColumn();
|
||||||
|
|
||||||
|
auto assert_sizes = [&](size_t size)
|
||||||
|
{
|
||||||
|
for (const auto & [name, column] : typed_paths)
|
||||||
|
ASSERT_EQ(column->size(), size);
|
||||||
|
|
||||||
|
for (const auto & [name, column] : dynamic_paths)
|
||||||
|
ASSERT_EQ(column->size(), size);
|
||||||
|
|
||||||
|
ASSERT_EQ(shared_data.size(), size);
|
||||||
|
};
|
||||||
|
|
||||||
|
auto checkpoint = col_object.getCheckpoint();
|
||||||
|
|
||||||
|
col_object.insert(Object{{"a.a", Field{1u}}});
|
||||||
|
col_object.updateCheckpoint(*checkpoint);
|
||||||
|
|
||||||
|
col_object.insert(Object{{"a.b", Field{2u}}});
|
||||||
|
col_object.insert(Object{{"a.a", Field{3u}}});
|
||||||
|
|
||||||
|
col_object.rollback(*checkpoint);
|
||||||
|
|
||||||
|
assert_sizes(1);
|
||||||
|
ASSERT_EQ(typed_paths.size(), 2);
|
||||||
|
ASSERT_EQ(dynamic_paths.size(), 0);
|
||||||
|
|
||||||
|
ASSERT_EQ((*typed_paths.at("a.a"))[0], Field{1u});
|
||||||
|
ASSERT_EQ((*typed_paths.at("a.b"))[0], Field{0u});
|
||||||
|
|
||||||
|
col_object.insert(Object{{"a.c", Field{"ccc"}}});
|
||||||
|
|
||||||
|
checkpoint = col_object.getCheckpoint();
|
||||||
|
|
||||||
|
col_object.insert(Object{{"a.d", Field{"ddd"}}});
|
||||||
|
col_object.insert(Object{{"a.e", Field{"eee"}}});
|
||||||
|
|
||||||
|
assert_sizes(4);
|
||||||
|
ASSERT_EQ(typed_paths.size(), 2);
|
||||||
|
ASSERT_EQ(dynamic_paths.size(), 2);
|
||||||
|
|
||||||
|
ASSERT_EQ((*typed_paths.at("a.a"))[0], Field{1u});
|
||||||
|
ASSERT_EQ((*dynamic_paths.at("a.c"))[1], Field{"ccc"});
|
||||||
|
ASSERT_EQ((*dynamic_paths.at("a.d"))[2], Field{"ddd"});
|
||||||
|
|
||||||
|
col_object.rollback(*checkpoint);
|
||||||
|
|
||||||
|
assert_sizes(2);
|
||||||
|
ASSERT_EQ(typed_paths.size(), 2);
|
||||||
|
ASSERT_EQ(dynamic_paths.size(), 1);
|
||||||
|
|
||||||
|
ASSERT_EQ((*typed_paths.at("a.a"))[0], Field{1u});
|
||||||
|
ASSERT_EQ((*dynamic_paths.at("a.c"))[1], Field{"ccc"});
|
||||||
|
}
|
||||||
|
@ -1023,15 +1023,14 @@ Chunk AsynchronousInsertQueue::processEntriesWithParsing(
|
|||||||
adding_defaults_transform = std::make_shared<AddingDefaultsTransform>(header, columns, *format, insert_context);
|
adding_defaults_transform = std::make_shared<AddingDefaultsTransform>(header, columns, *format, insert_context);
|
||||||
}
|
}
|
||||||
|
|
||||||
auto on_error = [&](const MutableColumns & result_columns, Exception & e)
|
auto on_error = [&](const MutableColumns & result_columns, const ColumnCheckpoints & checkpoints, Exception & e)
|
||||||
{
|
{
|
||||||
current_exception = e.displayText();
|
current_exception = e.displayText();
|
||||||
LOG_ERROR(logger, "Failed parsing for query '{}' with query id {}. {}",
|
LOG_ERROR(logger, "Failed parsing for query '{}' with query id {}. {}",
|
||||||
key.query_str, current_entry->query_id, current_exception);
|
key.query_str, current_entry->query_id, current_exception);
|
||||||
|
|
||||||
for (const auto & column : result_columns)
|
for (size_t i = 0; i < result_columns.size(); ++i)
|
||||||
if (column->size() > total_rows)
|
result_columns[i]->rollback(*checkpoints[i]);
|
||||||
column->popBack(column->size() - total_rows);
|
|
||||||
|
|
||||||
current_entry->finish(std::current_exception());
|
current_entry->finish(std::current_exception());
|
||||||
return 0;
|
return 0;
|
||||||
|
@ -22,8 +22,12 @@ StreamingFormatExecutor::StreamingFormatExecutor(
|
|||||||
, adding_defaults_transform(std::move(adding_defaults_transform_))
|
, adding_defaults_transform(std::move(adding_defaults_transform_))
|
||||||
, port(format->getPort().getHeader(), format.get())
|
, port(format->getPort().getHeader(), format.get())
|
||||||
, result_columns(header.cloneEmptyColumns())
|
, result_columns(header.cloneEmptyColumns())
|
||||||
|
, checkpoints(result_columns.size())
|
||||||
{
|
{
|
||||||
connect(format->getPort(), port);
|
connect(format->getPort(), port);
|
||||||
|
|
||||||
|
for (size_t i = 0; i < result_columns.size(); ++i)
|
||||||
|
checkpoints[i] = result_columns[i]->getCheckpoint();
|
||||||
}
|
}
|
||||||
|
|
||||||
MutableColumns StreamingFormatExecutor::getResultColumns()
|
MutableColumns StreamingFormatExecutor::getResultColumns()
|
||||||
@ -53,6 +57,9 @@ size_t StreamingFormatExecutor::execute(ReadBuffer & buffer)
|
|||||||
|
|
||||||
size_t StreamingFormatExecutor::execute()
|
size_t StreamingFormatExecutor::execute()
|
||||||
{
|
{
|
||||||
|
for (size_t i = 0; i < result_columns.size(); ++i)
|
||||||
|
result_columns[i]->updateCheckpoint(*checkpoints[i]);
|
||||||
|
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
size_t new_rows = 0;
|
size_t new_rows = 0;
|
||||||
@ -85,19 +92,19 @@ size_t StreamingFormatExecutor::execute()
|
|||||||
catch (Exception & e)
|
catch (Exception & e)
|
||||||
{
|
{
|
||||||
format->resetParser();
|
format->resetParser();
|
||||||
return on_error(result_columns, e);
|
return on_error(result_columns, checkpoints, e);
|
||||||
}
|
}
|
||||||
catch (std::exception & e)
|
catch (std::exception & e)
|
||||||
{
|
{
|
||||||
format->resetParser();
|
format->resetParser();
|
||||||
auto exception = Exception(Exception::CreateFromSTDTag{}, e);
|
auto exception = Exception(Exception::CreateFromSTDTag{}, e);
|
||||||
return on_error(result_columns, exception);
|
return on_error(result_columns, checkpoints, exception);
|
||||||
}
|
}
|
||||||
catch (...)
|
catch (...)
|
||||||
{
|
{
|
||||||
format->resetParser();
|
format->resetParser();
|
||||||
auto exception = Exception(ErrorCodes::UNKNOWN_EXCEPTION, "Unknowk exception while executing StreamingFormatExecutor with format {}", format->getName());
|
auto exception = Exception(ErrorCodes::UNKNOWN_EXCEPTION, "Unknown exception while executing StreamingFormatExecutor with format {}", format->getName());
|
||||||
return on_error(result_columns, exception);
|
return on_error(result_columns, checkpoints, exception);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -19,12 +19,12 @@ public:
|
|||||||
/// and exception to rethrow it or add context to it.
|
/// and exception to rethrow it or add context to it.
|
||||||
/// Should return number of new rows, which are added in callback
|
/// Should return number of new rows, which are added in callback
|
||||||
/// to result columns in comparison to previous call of `execute`.
|
/// to result columns in comparison to previous call of `execute`.
|
||||||
using ErrorCallback = std::function<size_t(const MutableColumns &, Exception &)>;
|
using ErrorCallback = std::function<size_t(const MutableColumns &, const ColumnCheckpoints &, Exception &)>;
|
||||||
|
|
||||||
StreamingFormatExecutor(
|
StreamingFormatExecutor(
|
||||||
const Block & header_,
|
const Block & header_,
|
||||||
InputFormatPtr format_,
|
InputFormatPtr format_,
|
||||||
ErrorCallback on_error_ = [](const MutableColumns &, Exception & e) -> size_t { throw std::move(e); },
|
ErrorCallback on_error_ = [](const MutableColumns &, const ColumnCheckpoints, Exception & e) -> size_t { throw std::move(e); },
|
||||||
SimpleTransformPtr adding_defaults_transform_ = nullptr);
|
SimpleTransformPtr adding_defaults_transform_ = nullptr);
|
||||||
|
|
||||||
/// Returns numbers of new read rows.
|
/// Returns numbers of new read rows.
|
||||||
@ -50,6 +50,7 @@ private:
|
|||||||
|
|
||||||
InputPort port;
|
InputPort port;
|
||||||
MutableColumns result_columns;
|
MutableColumns result_columns;
|
||||||
|
ColumnCheckpoints checkpoints;
|
||||||
};
|
};
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -105,6 +105,10 @@ Chunk IRowInputFormat::read()
|
|||||||
size_t num_columns = header.columns();
|
size_t num_columns = header.columns();
|
||||||
MutableColumns columns = header.cloneEmptyColumns();
|
MutableColumns columns = header.cloneEmptyColumns();
|
||||||
|
|
||||||
|
ColumnCheckpoints checkpoints(columns.size());
|
||||||
|
for (size_t column_idx = 0; column_idx < columns.size(); ++column_idx)
|
||||||
|
checkpoints[column_idx] = columns[column_idx]->getCheckpoint();
|
||||||
|
|
||||||
block_missing_values.clear();
|
block_missing_values.clear();
|
||||||
|
|
||||||
size_t num_rows = 0;
|
size_t num_rows = 0;
|
||||||
@ -130,6 +134,9 @@ Chunk IRowInputFormat::read()
|
|||||||
{
|
{
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
|
for (size_t column_idx = 0; column_idx < columns.size(); ++column_idx)
|
||||||
|
columns[column_idx]->updateCheckpoint(*checkpoints[column_idx]);
|
||||||
|
|
||||||
info.read_columns.clear();
|
info.read_columns.clear();
|
||||||
continue_reading = readRow(columns, info);
|
continue_reading = readRow(columns, info);
|
||||||
|
|
||||||
@ -193,14 +200,9 @@ Chunk IRowInputFormat::read()
|
|||||||
|
|
||||||
syncAfterError();
|
syncAfterError();
|
||||||
|
|
||||||
/// Truncate all columns in block to initial size (remove values, that was appended to only part of columns).
|
/// Rollback all columns in block to initial size (remove values, that was appended to only part of columns).
|
||||||
|
|
||||||
for (size_t column_idx = 0; column_idx < num_columns; ++column_idx)
|
for (size_t column_idx = 0; column_idx < num_columns; ++column_idx)
|
||||||
{
|
columns[column_idx]->rollback(*checkpoints[column_idx]);
|
||||||
auto & column = columns[column_idx];
|
|
||||||
if (column->size() > num_rows)
|
|
||||||
column->popBack(column->size() - num_rows);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -86,21 +86,18 @@ Chunk FileLogSource::generate()
|
|||||||
std::optional<String> exception_message;
|
std::optional<String> exception_message;
|
||||||
size_t total_rows = 0;
|
size_t total_rows = 0;
|
||||||
|
|
||||||
auto on_error = [&](const MutableColumns & result_columns, Exception & e)
|
auto on_error = [&](const MutableColumns & result_columns, const ColumnCheckpoints & checkpoints, Exception & e)
|
||||||
{
|
{
|
||||||
if (handle_error_mode == StreamingHandleErrorMode::STREAM)
|
if (handle_error_mode == StreamingHandleErrorMode::STREAM)
|
||||||
{
|
{
|
||||||
exception_message = e.message();
|
exception_message = e.message();
|
||||||
for (const auto & column : result_columns)
|
for (size_t i = 0; i < result_columns.size(); ++i)
|
||||||
{
|
{
|
||||||
// We could already push some rows to result_columns
|
// We could already push some rows to result_columns before exception, we need to fix it.
|
||||||
// before exception, we need to fix it.
|
result_columns[i]->rollback(*checkpoints[i]);
|
||||||
auto cur_rows = column->size();
|
|
||||||
if (cur_rows > total_rows)
|
|
||||||
column->popBack(cur_rows - total_rows);
|
|
||||||
|
|
||||||
// All data columns will get default value in case of error.
|
// All data columns will get default value in case of error.
|
||||||
column->insertDefault();
|
result_columns[i]->insertDefault();
|
||||||
}
|
}
|
||||||
|
|
||||||
return 1;
|
return 1;
|
||||||
|
@ -108,23 +108,20 @@ Chunk KafkaSource::generateImpl()
|
|||||||
size_t total_rows = 0;
|
size_t total_rows = 0;
|
||||||
size_t failed_poll_attempts = 0;
|
size_t failed_poll_attempts = 0;
|
||||||
|
|
||||||
auto on_error = [&](const MutableColumns & result_columns, Exception & e)
|
auto on_error = [&](const MutableColumns & result_columns, const ColumnCheckpoints & checkpoints, Exception & e)
|
||||||
{
|
{
|
||||||
ProfileEvents::increment(ProfileEvents::KafkaMessagesFailed);
|
ProfileEvents::increment(ProfileEvents::KafkaMessagesFailed);
|
||||||
|
|
||||||
if (put_error_to_stream)
|
if (put_error_to_stream)
|
||||||
{
|
{
|
||||||
exception_message = e.message();
|
exception_message = e.message();
|
||||||
for (const auto & column : result_columns)
|
for (size_t i = 0; i < result_columns.size(); ++i)
|
||||||
{
|
{
|
||||||
// read_kafka_message could already push some rows to result_columns
|
// We could already push some rows to result_columns before exception, we need to fix it.
|
||||||
// before exception, we need to fix it.
|
result_columns[i]->rollback(*checkpoints[i]);
|
||||||
auto cur_rows = column->size();
|
|
||||||
if (cur_rows > total_rows)
|
|
||||||
column->popBack(cur_rows - total_rows);
|
|
||||||
|
|
||||||
// all data columns will get default value in case of error
|
// all data columns will get default value in case of error
|
||||||
column->insertDefault();
|
result_columns[i]->insertDefault();
|
||||||
}
|
}
|
||||||
|
|
||||||
return 1;
|
return 1;
|
||||||
|
@ -817,23 +817,20 @@ StorageKafka2::PolledBatchInfo StorageKafka2::pollConsumer(
|
|||||||
size_t total_rows = 0;
|
size_t total_rows = 0;
|
||||||
size_t failed_poll_attempts = 0;
|
size_t failed_poll_attempts = 0;
|
||||||
|
|
||||||
auto on_error = [&](const MutableColumns & result_columns, Exception & e)
|
auto on_error = [&](const MutableColumns & result_columns, const ColumnCheckpoints & checkpoints, Exception & e)
|
||||||
{
|
{
|
||||||
ProfileEvents::increment(ProfileEvents::KafkaMessagesFailed);
|
ProfileEvents::increment(ProfileEvents::KafkaMessagesFailed);
|
||||||
|
|
||||||
if (put_error_to_stream)
|
if (put_error_to_stream)
|
||||||
{
|
{
|
||||||
exception_message = e.message();
|
exception_message = e.message();
|
||||||
for (const auto & column : result_columns)
|
for (size_t i = 0; i < result_columns.size(); ++i)
|
||||||
{
|
{
|
||||||
// read_kafka_message could already push some rows to result_columns
|
// We could already push some rows to result_columns before exception, we need to fix it.
|
||||||
// before exception, we need to fix it.
|
result_columns[i]->rollback(*checkpoints[i]);
|
||||||
auto cur_rows = column->size();
|
|
||||||
if (cur_rows > total_rows)
|
|
||||||
column->popBack(cur_rows - total_rows);
|
|
||||||
|
|
||||||
// all data columns will get default value in case of error
|
// all data columns will get default value in case of error
|
||||||
column->insertDefault();
|
result_columns[i]->insertDefault();
|
||||||
}
|
}
|
||||||
|
|
||||||
return 1;
|
return 1;
|
||||||
|
@ -102,21 +102,18 @@ Chunk NATSSource::generate()
|
|||||||
storage.getFormatName(), empty_buf, non_virtual_header, context, max_block_size, std::nullopt, 1);
|
storage.getFormatName(), empty_buf, non_virtual_header, context, max_block_size, std::nullopt, 1);
|
||||||
std::optional<String> exception_message;
|
std::optional<String> exception_message;
|
||||||
size_t total_rows = 0;
|
size_t total_rows = 0;
|
||||||
auto on_error = [&](const MutableColumns & result_columns, Exception & e)
|
auto on_error = [&](const MutableColumns & result_columns, const ColumnCheckpoints & checkpoints, Exception & e)
|
||||||
{
|
{
|
||||||
if (handle_error_mode == StreamingHandleErrorMode::STREAM)
|
if (handle_error_mode == StreamingHandleErrorMode::STREAM)
|
||||||
{
|
{
|
||||||
exception_message = e.message();
|
exception_message = e.message();
|
||||||
for (const auto & column : result_columns)
|
for (size_t i = 0; i < result_columns.size(); ++i)
|
||||||
{
|
{
|
||||||
// We could already push some rows to result_columns
|
// We could already push some rows to result_columns before exception, we need to fix it.
|
||||||
// before exception, we need to fix it.
|
result_columns[i]->rollback(*checkpoints[i]);
|
||||||
auto cur_rows = column->size();
|
|
||||||
if (cur_rows > total_rows)
|
|
||||||
column->popBack(cur_rows - total_rows);
|
|
||||||
|
|
||||||
// All data columns will get default value in case of error.
|
// All data columns will get default value in case of error.
|
||||||
column->insertDefault();
|
result_columns[i]->insertDefault();
|
||||||
}
|
}
|
||||||
|
|
||||||
return 1;
|
return 1;
|
||||||
|
@ -161,21 +161,18 @@ Chunk RabbitMQSource::generateImpl()
|
|||||||
std::optional<String> exception_message;
|
std::optional<String> exception_message;
|
||||||
size_t total_rows = 0;
|
size_t total_rows = 0;
|
||||||
|
|
||||||
auto on_error = [&](const MutableColumns & result_columns, Exception & e)
|
auto on_error = [&](const MutableColumns & result_columns, const ColumnCheckpoints & checkpoints, Exception & e)
|
||||||
{
|
{
|
||||||
if (handle_error_mode == StreamingHandleErrorMode::STREAM)
|
if (handle_error_mode == StreamingHandleErrorMode::STREAM)
|
||||||
{
|
{
|
||||||
exception_message = e.message();
|
exception_message = e.message();
|
||||||
for (const auto & column : result_columns)
|
for (size_t i = 0; i < result_columns.size(); ++i)
|
||||||
{
|
{
|
||||||
// We could already push some rows to result_columns
|
// We could already push some rows to result_columns before exception, we need to fix it.
|
||||||
// before exception, we need to fix it.
|
result_columns[i]->rollback(*checkpoints[i]);
|
||||||
auto cur_rows = column->size();
|
|
||||||
if (cur_rows > total_rows)
|
|
||||||
column->popBack(cur_rows - total_rows);
|
|
||||||
|
|
||||||
// All data columns will get default value in case of error.
|
// All data columns will get default value in case of error.
|
||||||
column->insertDefault();
|
result_columns[i]->insertDefault();
|
||||||
}
|
}
|
||||||
|
|
||||||
return 1;
|
return 1;
|
||||||
|
23
tests/queries/0_stateless/03230_async_insert_native.sh
Executable file
23
tests/queries/0_stateless/03230_async_insert_native.sh
Executable file
@ -0,0 +1,23 @@
|
|||||||
|
#!/usr/bin/env bash
|
||||||
|
|
||||||
|
CUR_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
|
||||||
|
# shellcheck source=../shell_config.sh
|
||||||
|
. "$CUR_DIR"/../shell_config.sh
|
||||||
|
|
||||||
|
${CLICKHOUSE_CLIENT} -q "
|
||||||
|
DROP TABLE IF EXISTS async_inserts_native;
|
||||||
|
CREATE TABLE async_inserts_native (m Map(UInt64, UInt64), v UInt64 MATERIALIZED m[4]) ENGINE = Memory;
|
||||||
|
"
|
||||||
|
|
||||||
|
url="${CLICKHOUSE_URL}&async_insert=1&async_insert_busy_timeout_max_ms=1000&async_insert_busy_timeout_min_ms=1000&wait_for_async_insert=1"
|
||||||
|
|
||||||
|
# This test runs inserts with memory_tracker_fault_probability > 0 to trigger memory limit during insertion.
|
||||||
|
# If rollback of columns is wrong in that case it may produce LOGICAL_ERROR and it will caught by termintation of server in debug mode.
|
||||||
|
for _ in {1..10}; do
|
||||||
|
${CLICKHOUSE_CLIENT} -q "SELECT (range(number), range(number))::Map(UInt64, UInt64) AS m FROM numbers(1000) FORMAT Native" | \
|
||||||
|
${CLICKHOUSE_CURL} -sS -X POST "${url}&max_block_size=100&memory_tracker_fault_probability=0.01&query=INSERT+INTO+async_inserts_native+FORMAT+Native" --data-binary @- >/dev/null 2>&1 &
|
||||||
|
done
|
||||||
|
|
||||||
|
wait
|
||||||
|
|
||||||
|
${CLICKHOUSE_CLIENT} -q "DROP TABLE async_inserts_native;"
|
18
tests/queries/0_stateless/03231_bson_tuple_array_map.sh
Executable file
18
tests/queries/0_stateless/03231_bson_tuple_array_map.sh
Executable file
@ -0,0 +1,18 @@
|
|||||||
|
#!/usr/bin/env bash
|
||||||
|
|
||||||
|
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
|
||||||
|
# shellcheck source=../shell_config.sh
|
||||||
|
. "$CURDIR"/../shell_config.sh
|
||||||
|
|
||||||
|
DATA_FILE=$CLICKHOUSE_TEST_UNIQUE_NAME.data
|
||||||
|
|
||||||
|
$CLICKHOUSE_LOCAL -q "select tuple(1, x'00000000000000000000FFFF0000000000') as x format BSONEachRow" > $DATA_FILE
|
||||||
|
$CLICKHOUSE_LOCAL -q "select * from file('$DATA_FILE', BSONEachRow, 'x Tuple(UInt32, IPv6)') settings input_format_allow_errors_num=1"
|
||||||
|
|
||||||
|
$CLICKHOUSE_LOCAL -q "select [x'00000000000000000000FFFF00000000', x'00000000000000000000FFFF0000000000'] as x format BSONEachRow" > $DATA_FILE
|
||||||
|
$CLICKHOUSE_LOCAL -q "select * from file('$DATA_FILE', BSONEachRow, 'x Array(IPv6)') settings input_format_allow_errors_num=1"
|
||||||
|
|
||||||
|
$CLICKHOUSE_LOCAL -q "select map('key1', x'00000000000000000000FFFF00000000', 'key2', x'00000000000000000000FFFF0000000000') as x format BSONEachRow" > $DATA_FILE
|
||||||
|
$CLICKHOUSE_LOCAL -q "select * from file('$DATA_FILE', BSONEachRow, 'x Map(String, IPv6)') settings input_format_allow_errors_num=1"
|
||||||
|
|
||||||
|
rm $DATA_FILE
|
Loading…
Reference in New Issue
Block a user