fix rollback of ColumnDynamic

This commit is contained in:
Anton Popov 2024-09-03 18:21:53 +00:00
parent d932d0ae4f
commit 4cd8272186
4 changed files with 174 additions and 19 deletions

View File

@ -979,6 +979,90 @@ ColumnPtr ColumnDynamic::compress() const
});
}
void ColumnDynamic::updateCheckpoint(ColumnCheckpoint & checkpoint) const
{
auto & nested = assert_cast<ColumnCheckpointWithMultipleNested &>(checkpoint).nested;
const auto & variants = variant_column_ptr->getVariants();
size_t old_size = nested.size();
for (size_t i = 0; i < old_size; ++i)
{
variants[i]->updateCheckpoint(*nested[i]);
}
/// If column has new variants since last checkpoint create checkpoints for them.
if (old_size < variants.size())
{
nested.resize(variants.size());
for (size_t i = old_size; i < variants.size(); ++i)
nested[i] = variants[i]->getCheckpoint();
}
checkpoint.size = size();
}
DataTypePtr ColumnDynamic::popBackVariants(const VariantInfo & info, const std::vector<ColumnVariant::Discriminator> & local_to_global_discriminators, size_t n)
{
const auto & type_variant = assert_cast<const DataTypeVariant &>(*info.variant_type);
std::unordered_map<ColumnVariant::Discriminator, String> discriminator_to_name;
std::unordered_map<String, DataTypePtr> name_to_data_type;
for (const auto & [name, discriminator] : info.variant_name_to_discriminator)
discriminator_to_name.emplace(discriminator, name);
for (const auto & type : type_variant.getVariants())
name_to_data_type.emplace(type->getName(), type);
/// Remove last n variants according to global discriminators.
/// This code relies on invariant that new variants are always added to the end in ColumnVariant.
for (auto it = local_to_global_discriminators.rbegin(); it < local_to_global_discriminators.rbegin() + n; ++it)
discriminator_to_name.erase(*it);
DataTypes new_variants;
for (const auto & [d, name] : discriminator_to_name)
new_variants.push_back(name_to_data_type.at(name));
return std::make_shared<DataTypeVariant>(std::move(new_variants));
}
void ColumnDynamic::rollback(const ColumnCheckpoint & checkpoint)
{
const auto & nested = assert_cast<const ColumnCheckpointWithMultipleNested &>(checkpoint).nested;
chassert(nested.size() <= variant_column_ptr->getNumVariants());
/// The structure hasn't changed, so we can use generic rollback of Variant column
if (nested.size() == variant_column_ptr->getNumVariants())
{
variant_column_ptr->rollback(checkpoint);
return;
}
auto new_subcolumns = variant_column_ptr->getVariants();
auto new_discriminators_map = variant_column_ptr->getLocalToGlobalDiscriminatorsMapping();
auto new_discriminators_column = variant_column_ptr->getLocalDiscriminatorsPtr();
auto new_offses_column = variant_column_ptr->getOffsetsPtr();
/// Remove new variants that were added since last checkpoint.
auto new_variant_type = popBackVariants(variant_info, new_discriminators_map, variant_column_ptr->getNumVariants() - nested.size());
createVariantInfo(new_variant_type);
variant_mappings_cache.clear();
new_subcolumns.resize(nested.size());
new_discriminators_map.resize(nested.size());
/// Manually rollback internals of Variant column
new_discriminators_column->assumeMutable()->popBack(new_discriminators_column->size() - checkpoint.size);
new_offses_column->assumeMutable()->popBack(new_offses_column->size() - checkpoint.size);
for (size_t i = 0; i < nested.size(); ++i)
new_subcolumns[i]->rollback(*nested[i]);
variant_column = ColumnVariant::create(new_discriminators_column, new_offses_column, Columns(new_subcolumns.begin(), new_subcolumns.end()), new_discriminators_map);
variant_column_ptr = variant_column_ptr = assert_cast<ColumnVariant *>(variant_column.get());
}
void ColumnDynamic::prepareForSquashing(const Columns & source_columns)
{
if (source_columns.empty())

View File

@ -309,15 +309,9 @@ public:
return variant_column_ptr->getCheckpoint();
}
void updateCheckpoint(ColumnCheckpoint & checkpoint) const override
{
variant_column_ptr->updateCheckpoint(checkpoint);
}
void updateCheckpoint(ColumnCheckpoint & checkpoint) const override;
void rollback(const ColumnCheckpoint & checkpoint) override
{
variant_column_ptr->rollback(checkpoint);
}
void rollback(const ColumnCheckpoint & checkpoint) override;
void forEachSubcolumn(MutableColumnCallback callback) override
{
@ -456,6 +450,8 @@ private:
void updateVariantInfoAndExpandVariantColumn(const DataTypePtr & new_variant_type);
static DataTypePtr popBackVariants(const VariantInfo & info, const std::vector<ColumnVariant::Discriminator> & local_to_global_discriminators, size_t n);
WrappedPtr variant_column;
/// Store and use pointer to ColumnVariant to avoid virtual calls.
/// ColumnDynamic is widely used inside ColumnObject for each path and

View File

@ -712,20 +712,26 @@ void ColumnObject::rollback(const ColumnCheckpoint & checkpoint)
{
const auto & object_checkpoint = assert_cast<const ColumnObjectCheckpoint &>(checkpoint);
for (auto & [name, column] : typed_paths)
auto rollback_columns = [&](auto & columns_map, const auto & checkpoints_map)
{
const auto & nested = object_checkpoint.typed_paths.at(name);
chassert(nested);
column->rollback(*nested);
}
NameSet names_to_remove;
for (auto & [name, column] : dynamic_paths_ptrs)
{
const auto & nested = object_checkpoint.dynamic_paths.at(name);
chassert(nested);
column->rollback(*nested);
}
/// Rollback subcolumns and remove paths that were not in checkpoint.
for (auto & [name, column] : columns_map)
{
auto it = checkpoints_map.find(name);
if (it == checkpoints_map.end())
names_to_remove.insert(name);
else
column->rollback(*it->second);
}
for (const auto & name : names_to_remove)
columns_map.erase(name);
};
rollback_columns(typed_paths, object_checkpoint.typed_paths);
rollback_columns(dynamic_paths, object_checkpoint.dynamic_paths);
shared_data->rollback(*object_checkpoint.shared_data);
}

View File

@ -920,3 +920,72 @@ TEST(ColumnDynamic, compare)
ASSERT_EQ(column_from->compareAt(3, 2, *column_from, -1), -1);
ASSERT_EQ(column_from->compareAt(3, 4, *column_from, -1), -1);
}
TEST(ColumnDynamic, rollback)
{
auto check_variant = [](const ColumnVariant & column_variant, std::vector<size_t> sizes)
{
ASSERT_EQ(column_variant.getNumVariants(), sizes.size());
size_t num_rows = 0;
for (size_t i = 0; i < sizes.size(); ++i)
{
ASSERT_EQ(column_variant.getVariants()[i]->size(), sizes[i]);
num_rows += sizes[i];
}
ASSERT_EQ(num_rows, column_variant.size());
};
auto check_checkpoint = [&](const ColumnCheckpoint & cp, std::vector<size_t> sizes)
{
const auto & nested = assert_cast<const ColumnCheckpointWithMultipleNested &>(cp).nested;
ASSERT_EQ(nested.size(), sizes.size());
size_t num_rows = 0;
for (size_t i = 0; i < sizes.size(); ++i)
{
ASSERT_EQ(nested[i]->size, sizes[i]);
num_rows += sizes[i];
}
ASSERT_EQ(num_rows, cp.size);
};
std::vector<std::pair<ColumnCheckpointPtr, std::vector<size_t>>> checkpoints;
auto column = ColumnDynamic::create(2);
auto checkpoint = column->getCheckpoint();
column->insert(Field(42));
column->updateCheckpoint(*checkpoint);
checkpoints.emplace_back(checkpoint, std::vector<size_t>{0, 1});
column->insert(Field("str1"));
column->rollback(*checkpoint);
check_checkpoint(*checkpoint, checkpoints.back().second);
check_variant(column->getVariantColumn(), checkpoints.back().second);
column->insert("str1");
checkpoints.emplace_back(column->getCheckpoint(), std::vector<size_t>{0, 1, 1});
column->insert("str2");
checkpoints.emplace_back(column->getCheckpoint(), std::vector<size_t>{0, 1, 2});
column->insert(Array({1, 2}));
checkpoints.emplace_back(column->getCheckpoint(), std::vector<size_t>{1, 1, 2});
column->insert(Field(42.42));
checkpoints.emplace_back(column->getCheckpoint(), std::vector<size_t>{2, 1, 2});
for (const auto & [cp, sizes] : checkpoints)
{
auto column_copy = column->clone();
column_copy->rollback(*cp);
check_checkpoint(*cp, sizes);
check_variant(assert_cast<const ColumnDynamic &>(*column_copy).getVariantColumn(), sizes);
}
}