Revert "Revert "Fix several RENAME COLUMN bugs.""

This commit is contained in:
alesapin 2023-02-27 12:27:57 +01:00 committed by GitHub
parent 77e5b8272d
commit e3961c118a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
46 changed files with 892 additions and 169 deletions

View File

@ -0,0 +1,55 @@
#include <Storages/MergeTree/AlterConversions.h>
#include <Common/Exception.h>
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
}
bool AlterConversions::columnHasNewName(const std::string & old_name) const
{
for (const auto & [new_name, prev_name] : rename_map)
{
if (old_name == prev_name)
return true;
}
return false;
}
std::string AlterConversions::getColumnNewName(const std::string & old_name) const
{
for (const auto & [new_name, prev_name] : rename_map)
{
if (old_name == prev_name)
return new_name;
}
throw Exception(ErrorCodes::LOGICAL_ERROR, "Column {} was not renamed", old_name);
}
bool AlterConversions::isColumnRenamed(const std::string & new_name) const
{
for (const auto & [name_to, name_from] : rename_map)
{
if (name_to == new_name)
return true;
}
return false;
}
/// Get column old name before rename (lookup by key in rename_map)
std::string AlterConversions::getColumnOldName(const std::string & new_name) const
{
for (const auto & [name_to, name_from] : rename_map)
{
if (name_to == new_name)
return name_from;
}
throw Exception(ErrorCodes::LOGICAL_ERROR, "Column {} was not renamed", new_name);
}
}

View File

@ -14,11 +14,22 @@ namespace DB
/// part->getColumns() and storage->getColumns().
struct AlterConversions
{
struct RenamePair
{
std::string rename_to;
std::string rename_from;
};
/// Rename map new_name -> old_name
std::unordered_map<std::string, std::string> rename_map;
std::vector<RenamePair> rename_map;
bool isColumnRenamed(const std::string & new_name) const { return rename_map.count(new_name) > 0; }
std::string getColumnOldName(const std::string & new_name) const { return rename_map.at(new_name); }
/// Column was renamed (lookup by value in rename_map)
bool columnHasNewName(const std::string & old_name) const;
/// Get new name for column (lookup by value in rename_map)
std::string getColumnNewName(const std::string & old_name) const;
/// Is this name is new name of column (lookup by key in rename_map)
bool isColumnRenamed(const std::string & new_name) const;
/// Get column old name before rename (lookup by key in rename_map)
std::string getColumnOldName(const std::string & new_name) const;
};
}

View File

@ -683,6 +683,7 @@ void DataPartStorageOnDiskBase::clearDirectory(
request.emplace_back(fs::path(dir) / "default_compression_codec.txt", true);
request.emplace_back(fs::path(dir) / "delete-on-destroy.txt", true);
request.emplace_back(fs::path(dir) / "txn_version.txt", true);
request.emplace_back(fs::path(dir) / "metadata_version.txt", true);
/// Inverted index
request.emplace_back(fs::path(dir) / "skp_idx_af.gin_dict", true);

View File

@ -63,8 +63,9 @@ constexpr auto REPLICATION_PROTOCOL_VERSION_WITH_PARTS_DEFAULT_COMPRESSION = 4;
constexpr auto REPLICATION_PROTOCOL_VERSION_WITH_PARTS_UUID = 5;
constexpr auto REPLICATION_PROTOCOL_VERSION_WITH_PARTS_ZERO_COPY = 6;
constexpr auto REPLICATION_PROTOCOL_VERSION_WITH_PARTS_PROJECTION = 7;
constexpr auto REPLICATION_PROTOCOL_VERSION_WITH_METADATA_VERSION = 8;
// Reserved for ALTER PRIMARY KEY
// constexpr auto REPLICATION_PROTOCOL_VERSION_WITH_PARTS_PRIMARY_KEY = 8;
// constexpr auto REPLICATION_PROTOCOL_VERSION_WITH_PARTS_PRIMARY_KEY = 9;
std::string getEndpointId(const std::string & node_id)
{
@ -120,7 +121,7 @@ void Service::processQuery(const HTMLForm & params, ReadBuffer & /*body*/, Write
MergeTreePartInfo::fromPartName(part_name, data.format_version);
/// We pretend to work as older server version, to be sure that client will correctly process our version
response.addCookie({"server_protocol_version", toString(std::min(client_protocol_version, REPLICATION_PROTOCOL_VERSION_WITH_PARTS_PROJECTION))});
response.addCookie({"server_protocol_version", toString(std::min(client_protocol_version, REPLICATION_PROTOCOL_VERSION_WITH_METADATA_VERSION))});
LOG_TRACE(log, "Sending part {}", part_name);
@ -280,6 +281,10 @@ MergeTreeData::DataPart::Checksums Service::sendPartFromDisk(
&& name == IMergeTreeDataPart::DEFAULT_COMPRESSION_CODEC_FILE_NAME)
continue;
if (client_protocol_version < REPLICATION_PROTOCOL_VERSION_WITH_METADATA_VERSION
&& name == IMergeTreeDataPart::METADATA_VERSION_FILE_NAME)
continue;
files_to_replicate.insert(name);
}
@ -407,7 +412,7 @@ MergeTreeData::MutableDataPartPtr Fetcher::fetchSelectedPart(
{
{"endpoint", getEndpointId(replica_path)},
{"part", part_name},
{"client_protocol_version", toString(REPLICATION_PROTOCOL_VERSION_WITH_PARTS_PROJECTION)},
{"client_protocol_version", toString(REPLICATION_PROTOCOL_VERSION_WITH_METADATA_VERSION)},
{"compress", "false"}
});
@ -692,7 +697,7 @@ MergeTreeData::MutableDataPartPtr Fetcher::downloadPartToMemory(
auto block = block_in.read();
throttler->add(block.bytes());
new_data_part->setColumns(block.getNamesAndTypesList(), {});
new_data_part->setColumns(block.getNamesAndTypesList(), {}, metadata_snapshot->getMetadataVersion());
if (!is_projection)
{
@ -768,7 +773,8 @@ void Fetcher::downloadBaseOrProjectionPartToDisk(
if (file_name != "checksums.txt" &&
file_name != "columns.txt" &&
file_name != IMergeTreeDataPart::DEFAULT_COMPRESSION_CODEC_FILE_NAME)
file_name != IMergeTreeDataPart::DEFAULT_COMPRESSION_CODEC_FILE_NAME &&
file_name != IMergeTreeDataPart::METADATA_VERSION_FILE_NAME)
checksums.addFile(file_name, file_size, expected_hash);
}

View File

@ -416,10 +416,11 @@ std::pair<time_t, time_t> IMergeTreeDataPart::getMinMaxTime() const
}
void IMergeTreeDataPart::setColumns(const NamesAndTypesList & new_columns, const SerializationInfoByName & new_infos)
void IMergeTreeDataPart::setColumns(const NamesAndTypesList & new_columns, const SerializationInfoByName & new_infos, int32_t metadata_version_)
{
columns = new_columns;
serialization_infos = new_infos;
metadata_version = metadata_version_;
column_name_to_position.clear();
column_name_to_position.reserve(new_columns.size());
@ -660,6 +661,7 @@ void IMergeTreeDataPart::appendFilesOfColumnsChecksumsIndexes(Strings & files, b
appendFilesOfPartitionAndMinMaxIndex(files);
appendFilesOfTTLInfos(files);
appendFilesOfDefaultCompressionCodec(files);
appendFilesOfMetadataVersion(files);
}
if (!parent_part && include_projection)
@ -798,6 +800,9 @@ NameSet IMergeTreeDataPart::getFileNamesWithoutChecksums() const
if (getDataPartStorage().exists(TXN_VERSION_METADATA_FILE_NAME))
result.emplace(TXN_VERSION_METADATA_FILE_NAME);
if (getDataPartStorage().exists(METADATA_VERSION_FILE_NAME))
result.emplace(METADATA_VERSION_FILE_NAME);
return result;
}
@ -971,11 +976,22 @@ void IMergeTreeDataPart::removeVersionMetadata()
getDataPartStorage().removeFileIfExists("txn_version.txt");
}
void IMergeTreeDataPart::removeMetadataVersion()
{
getDataPartStorage().removeFileIfExists(METADATA_VERSION_FILE_NAME);
}
void IMergeTreeDataPart::appendFilesOfDefaultCompressionCodec(Strings & files)
{
files.push_back(DEFAULT_COMPRESSION_CODEC_FILE_NAME);
}
void IMergeTreeDataPart::appendFilesOfMetadataVersion(Strings & files)
{
files.push_back(METADATA_VERSION_FILE_NAME);
}
CompressionCodecPtr IMergeTreeDataPart::detectDefaultCompressionCodec() const
{
/// In memory parts doesn't have any compression
@ -1288,8 +1304,7 @@ void IMergeTreeDataPart::loadColumns(bool require)
metadata_snapshot = metadata_snapshot->projections.get(name).metadata;
NamesAndTypesList loaded_columns;
bool exists = metadata_manager->exists("columns.txt");
if (!exists)
if (!metadata_manager->exists("columns.txt"))
{
/// We can get list of columns only from columns.txt in compact parts.
if (require || part_type == Type::Compact)
@ -1322,16 +1337,32 @@ void IMergeTreeDataPart::loadColumns(bool require)
};
SerializationInfoByName infos(loaded_columns, settings);
exists = metadata_manager->exists(SERIALIZATION_FILE_NAME);
if (exists)
if (metadata_manager->exists(SERIALIZATION_FILE_NAME))
{
auto in = metadata_manager->read(SERIALIZATION_FILE_NAME);
infos.readJSON(*in);
}
setColumns(loaded_columns, infos);
int32_t loaded_metadata_version;
if (metadata_manager->exists(METADATA_VERSION_FILE_NAME))
{
auto in = metadata_manager->read(METADATA_VERSION_FILE_NAME);
readIntText(loaded_metadata_version, *in);
}
else
{
loaded_metadata_version = metadata_snapshot->getMetadataVersion();
writeMetadata(METADATA_VERSION_FILE_NAME, {}, [loaded_metadata_version](auto & buffer)
{
writeIntText(loaded_metadata_version, buffer);
});
}
setColumns(loaded_columns, infos, loaded_metadata_version);
}
/// Project part / part with project parts / compact part doesn't support LWD.
bool IMergeTreeDataPart::supportLightweightDeleteMutate() const
{

View File

@ -137,7 +137,11 @@ public:
String getTypeName() const { return getType().toString(); }
void setColumns(const NamesAndTypesList & new_columns, const SerializationInfoByName & new_infos);
/// We could have separate method like setMetadata, but it's much more convenient to set it up with columns
void setColumns(const NamesAndTypesList & new_columns, const SerializationInfoByName & new_infos, int32_t metadata_version_);
/// Version of metadata for part (columns, pk and so on)
int32_t getMetadataVersion() const { return metadata_version; }
const NamesAndTypesList & getColumns() const { return columns; }
const ColumnsDescription & getColumnsDescription() const { return columns_description; }
@ -308,6 +312,9 @@ public:
mutable VersionMetadata version;
/// Version of part metadata (columns, pk and so on). Managed properly only for replicated merge tree.
int32_t metadata_version;
/// For data in RAM ('index')
UInt64 getIndexSizeInBytes() const;
UInt64 getIndexSizeInAllocatedBytes() const;
@ -379,8 +386,12 @@ public:
/// (number of rows, number of rows with default values, etc).
static inline constexpr auto SERIALIZATION_FILE_NAME = "serialization.json";
/// Version used for transactions.
static inline constexpr auto TXN_VERSION_METADATA_FILE_NAME = "txn_version.txt";
static inline constexpr auto METADATA_VERSION_FILE_NAME = "metadata_version.txt";
/// One of part files which is used to check how many references (I'd like
/// to say hardlinks, but it will confuse even more) we have for the part
/// for zero copy replication. Sadly it's very complex.
@ -443,7 +454,11 @@ public:
void writeDeleteOnDestroyMarker();
void removeDeleteOnDestroyMarker();
/// It may look like a stupid joke. but these two methods are absolutely unrelated.
/// This one is about removing file with metadata about part version (for transactions)
void removeVersionMetadata();
/// This one is about removing file with version of part's metadata (columns, pk and so on)
void removeMetadataVersion();
mutable std::atomic<DataPartRemovalState> removal_state = DataPartRemovalState::NOT_ATTEMPTED;
@ -582,6 +597,8 @@ private:
static void appendFilesOfDefaultCompressionCodec(Strings & files);
static void appendFilesOfMetadataVersion(Strings & files);
/// Found column without specific compression and return codec
/// for this column with default parameters.
CompressionCodecPtr detectDefaultCompressionCodec() const;

View File

@ -208,7 +208,7 @@ bool MergeTask::ExecuteAndFinalizeHorizontalPart::prepare()
infos.add(part->getSerializationInfos());
}
global_ctx->new_data_part->setColumns(global_ctx->storage_columns, infos);
global_ctx->new_data_part->setColumns(global_ctx->storage_columns, infos, global_ctx->metadata_snapshot->getMetadataVersion());
const auto & local_part_min_ttl = global_ctx->new_data_part->ttl_infos.part_min_ttl;
if (local_part_min_ttl && local_part_min_ttl <= global_ctx->time_of_merge)

View File

@ -4427,6 +4427,11 @@ MergeTreeData::DataPartPtr MergeTreeData::getPartIfExistsUnlocked(const MergeTre
static void loadPartAndFixMetadataImpl(MergeTreeData::MutableDataPartPtr part)
{
/// Remove metadata version file and take it from table.
/// Currently we cannot attach parts with different schema, so
/// we can assume that it's equal to table's current schema.
part->removeMetadataVersion();
part->loadColumnsChecksumsIndexes(false, true);
part->modification_time = part->getDataPartStorage().getLastModified().epochTime();
part->removeDeleteOnDestroyMarker();
@ -7649,15 +7654,23 @@ bool MergeTreeData::canUsePolymorphicParts(const MergeTreeSettings & settings, S
AlterConversions MergeTreeData::getAlterConversionsForPart(const MergeTreeDataPartPtr part) const
{
MutationCommands commands = getFirstAlterMutationCommandsForPart(part);
std::map<int64_t, MutationCommands> commands_map = getAlterMutationCommandsForPart(part);
AlterConversions result{};
for (const auto & command : commands)
/// Currently we need explicit conversions only for RENAME alter
/// all other conversions can be deduced from diff between part columns
/// and columns in storage.
if (command.type == MutationCommand::Type::RENAME_COLUMN)
result.rename_map[command.rename_to] = command.column_name;
auto & rename_map = result.rename_map;
for (const auto & [version, commands] : commands_map)
{
for (const auto & command : commands)
{
/// Currently we need explicit conversions only for RENAME alter
/// all other conversions can be deduced from diff between part columns
/// and columns in storage.
if (command.type == MutationCommand::Type::RENAME_COLUMN)
{
rename_map.emplace_back(AlterConversions::RenamePair{command.rename_to, command.column_name});
}
}
}
return result;
}
@ -8044,7 +8057,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeData::createEmptyPart(
if (settings->assign_part_uuids)
new_data_part->uuid = UUIDHelpers::generateV4();
new_data_part->setColumns(columns, {});
new_data_part->setColumns(columns, {}, metadata_snapshot->getMetadataVersion());
new_data_part->rows_count = block.rows();
new_data_part->partition = partition;

View File

@ -1313,7 +1313,7 @@ protected:
/// Used to receive AlterConversions for part and apply them on fly. This
/// method has different implementations for replicated and non replicated
/// MergeTree because they store mutations in different way.
virtual MutationCommands getFirstAlterMutationCommandsForPart(const DataPartPtr & part) const = 0;
virtual std::map<int64_t, MutationCommands> getAlterMutationCommandsForPart(const DataPartPtr & part) const = 0;
/// Moves part to specified space, used in ALTER ... MOVE ... queries
bool movePartsToSpace(const DataPartsVector & parts, SpacePtr space);

View File

@ -73,7 +73,7 @@ MutableDataPartStoragePtr MergeTreeDataPartInMemory::flushToDisk(const String &
new_data_part_storage->beginTransaction();
new_data_part->uuid = uuid;
new_data_part->setColumns(columns, {});
new_data_part->setColumns(columns, {}, metadata_snapshot->getMetadataVersion());
new_data_part->partition.value = partition.value;
new_data_part->minmax_idx = minmax_idx;
@ -104,7 +104,7 @@ MutableDataPartStoragePtr MergeTreeDataPartInMemory::flushToDisk(const String &
.build();
new_projection_part->is_temp = false; // clean up will be done on parent part
new_projection_part->setColumns(projection->getColumns(), {});
new_projection_part->setColumns(projection->getColumns(), {}, metadata_snapshot->getMetadataVersion());
auto new_projection_part_storage = new_projection_part->getDataPartStoragePtr();
if (new_projection_part_storage->exists())

View File

@ -464,7 +464,7 @@ MergeTreeDataWriter::TemporaryPart MergeTreeDataWriter::writeTempPartImpl(
SerializationInfoByName infos(columns, settings);
infos.add(block);
new_data_part->setColumns(columns, infos);
new_data_part->setColumns(columns, infos, metadata_snapshot->getMetadataVersion());
new_data_part->rows_count = block.rows();
new_data_part->partition = std::move(partition);
new_data_part->minmax_idx = std::move(minmax_idx);
@ -586,7 +586,7 @@ MergeTreeDataWriter::TemporaryPart MergeTreeDataWriter::writeProjectionPartImpl(
SerializationInfoByName infos(columns, settings);
infos.add(block);
new_data_part->setColumns(columns, infos);
new_data_part->setColumns(columns, infos, metadata_snapshot->getMetadataVersion());
if (new_data_part->isStoredOnDisk())
{

View File

@ -102,6 +102,15 @@ MarkCache::MappedPtr MergeTreeMarksLoader::loadMarksImpl()
auto res = std::make_shared<MarksInCompressedFile>(marks_count * columns_in_mark);
if (file_size == 0 && marks_count != 0)
{
throw Exception(
ErrorCodes::CORRUPTED_DATA,
"Empty marks file '{}': {}, must be: {}",
std::string(fs::path(data_part_storage->getFullPath()) / mrk_path),
file_size, expected_uncompressed_size);
}
if (!index_granularity_info.mark_type.compressed && expected_uncompressed_size != file_size)
throw Exception(
ErrorCodes::CORRUPTED_DATA,
@ -138,7 +147,12 @@ MarkCache::MappedPtr MergeTreeMarksLoader::loadMarksImpl()
}
if (i * mark_size != expected_uncompressed_size)
throw Exception(ErrorCodes::CANNOT_READ_ALL_DATA, "Cannot read all marks from file {}", mrk_path);
{
throw Exception(
ErrorCodes::CANNOT_READ_ALL_DATA,
"Cannot read all marks from file {}, marks expected {} (bytes size {}), marks read {} (bytes size {})",
mrk_path, marks_count, expected_uncompressed_size, i, reader->count());
}
}
res->protect();

View File

@ -229,7 +229,7 @@ MergeTreeData::MutableDataPartsVector MergeTreeWriteAheadLog::restore(
part->minmax_idx->update(block, storage.getMinMaxColumnsNames(metadata_snapshot->getPartitionKey()));
part->partition.create(metadata_snapshot, block, 0, context);
part->setColumns(block.getNamesAndTypesList(), {});
part->setColumns(block.getNamesAndTypesList(), {}, metadata_snapshot->getMetadataVersion());
if (metadata_snapshot->hasSortingKey())
metadata_snapshot->getSortingKey().expression->execute(block);

View File

@ -175,7 +175,7 @@ MergedBlockOutputStream::Finalizer MergedBlockOutputStream::finalizePartAsync(
serialization_infos.replaceData(new_serialization_infos);
files_to_remove_after_sync = removeEmptyColumnsFromPart(new_part, part_columns, serialization_infos, checksums);
new_part->setColumns(part_columns, serialization_infos);
new_part->setColumns(part_columns, serialization_infos, metadata_snapshot->getMetadataVersion());
}
auto finalizer = std::make_unique<Finalizer::Impl>(*writer, new_part, files_to_remove_after_sync, sync);
@ -289,6 +289,14 @@ MergedBlockOutputStream::WrittenFiles MergedBlockOutputStream::finalizePartOnDis
written_files.emplace_back(std::move(out));
}
{
/// Write a file with a description of columns.
auto out = new_part->getDataPartStorage().writeFile(IMergeTreeDataPart::METADATA_VERSION_FILE_NAME, 4096, write_settings);
DB::writeIntText(new_part->getMetadataVersion(), *out);
out->preFinalize();
written_files.emplace_back(std::move(out));
}
if (default_codec != nullptr)
{
auto out = new_part->getDataPartStorage().writeFile(IMergeTreeDataPart::DEFAULT_COMPRESSION_CODEC_FILE_NAME, 4096, write_settings);

View File

@ -85,7 +85,7 @@ MergedColumnOnlyOutputStream::fillChecksums(
all_checksums.files.erase(removed_file);
}
new_part->setColumns(columns, serialization_infos);
new_part->setColumns(columns, serialization_infos, metadata_snapshot->getMetadataVersion());
return checksums;
}

View File

@ -53,7 +53,7 @@ static bool checkOperationIsNotCanceled(ActionBlocker & merges_blocker, MergeLis
* First part should be executed by mutations interpreter.
* Other is just simple drop/renames, so they can be executed without interpreter.
*/
static void splitMutationCommands(
static void splitAndModifyMutationCommands(
MergeTreeData::DataPartPtr part,
const MutationCommands & commands,
MutationCommands & for_interpreter,
@ -98,25 +98,48 @@ static void splitMutationCommands(
else
mutated_columns.emplace(command.column_name);
}
if (command.type == MutationCommand::Type::RENAME_COLUMN)
{
for_interpreter.push_back(
{
.type = MutationCommand::Type::READ_COLUMN,
.column_name = command.rename_to,
});
part_columns.rename(command.column_name, command.rename_to);
}
}
}
auto alter_conversions = part->storage.getAlterConversionsForPart(part);
/// We don't add renames from commands, instead we take them from rename_map.
/// It's important because required renames depend not only on part's data version (i.e. mutation version)
/// but also on part's metadata version. Why we have such logic only for renames? Because all other types of alter
/// can be deduced based on difference between part's schema and table schema.
for (const auto & [rename_to, rename_from] : alter_conversions.rename_map)
{
if (part_columns.has(rename_from))
{
/// Actual rename
for_interpreter.push_back(
{
.type = MutationCommand::Type::READ_COLUMN,
.column_name = rename_to,
});
/// Not needed for compact parts (not executed), added here only to produce correct
/// set of columns for new part and their serializations
for_file_renames.push_back(
{
.type = MutationCommand::Type::RENAME_COLUMN,
.column_name = rename_from,
.rename_to = rename_to
});
part_columns.rename(rename_from, rename_to);
}
}
/// If it's compact part, then we don't need to actually remove files
/// from disk we just don't read dropped columns
for (const auto & column : part->getColumns())
for (const auto & column : part_columns)
{
if (!mutated_columns.contains(column.name))
{
for_interpreter.emplace_back(
MutationCommand{.type = MutationCommand::Type::READ_COLUMN, .column_name = column.name, .data_type = column.type});
}
}
}
else
@ -142,15 +165,59 @@ static void splitMutationCommands(
{
if (command.type == MutationCommand::Type::READ_COLUMN)
for_interpreter.push_back(command);
else if (command.type == MutationCommand::Type::RENAME_COLUMN)
part_columns.rename(command.column_name, command.rename_to);
for_file_renames.push_back(command);
}
}
auto alter_conversions = part->storage.getAlterConversionsForPart(part);
/// We don't add renames from commands, instead we take them from rename_map.
/// It's important because required renames depend not only on part's data version (i.e. mutation version)
/// but also on part's metadata version. Why we have such logic only for renames? Because all other types of alter
/// can be deduced based on difference between part's schema and table schema.
for (const auto & [rename_to, rename_from] : alter_conversions.rename_map)
{
for_file_renames.push_back({.type = MutationCommand::Type::RENAME_COLUMN, .column_name = rename_from, .rename_to = rename_to});
}
}
}
/// It's legal to squash renames because commands with rename are always "barrier"
/// and executed separately from other types of commands.
static MutationCommands squashRenamesInCommands(const MutationCommands & commands)
{
NameToNameMap squashed_renames;
for (const auto & command : commands)
{
std::string result_name = command.rename_to;
bool squashed = false;
for (const auto & [name_from, name_to] : squashed_renames)
{
if (name_to == command.column_name)
{
squashed = true;
squashed_renames[name_from] = result_name;
break;
}
}
if (!squashed)
squashed_renames[command.column_name] = result_name;
}
MutationCommands squashed_commands;
for (const auto & command : commands)
{
if (squashed_renames.contains(command.column_name))
{
squashed_commands.push_back(command);
squashed_commands.back().rename_to = squashed_renames[command.column_name];
}
}
return squashed_commands;
}
/// Get the columns list of the resulting part in the same order as storage_columns.
static std::pair<NamesAndTypesList, SerializationInfoByName>
getColumnsForNewDataPart(
@ -158,8 +225,13 @@ getColumnsForNewDataPart(
const Block & updated_header,
NamesAndTypesList storage_columns,
const SerializationInfoByName & serialization_infos,
const MutationCommands & commands_for_interpreter,
const MutationCommands & commands_for_removes)
{
MutationCommands all_commands;
all_commands.insert(all_commands.end(), commands_for_interpreter.begin(), commands_for_interpreter.end());
all_commands.insert(all_commands.end(), commands_for_removes.begin(), commands_for_removes.end());
NameSet removed_columns;
NameToNameMap renamed_columns_to_from;
NameToNameMap renamed_columns_from_to;
@ -175,8 +247,9 @@ getColumnsForNewDataPart(
storage_columns.emplace_back(column);
}
/// All commands are validated in AlterCommand so we don't care about order
for (const auto & command : commands_for_removes)
MutationCommands squashed_commands = squashRenamesInCommands(all_commands);
for (const auto & command : squashed_commands)
{
if (command.type == MutationCommand::UPDATE)
{
@ -269,20 +342,38 @@ getColumnsForNewDataPart(
/// should it's previous version should be dropped or removed
if (renamed_columns_to_from.contains(it->name) && !was_renamed && !was_removed)
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Incorrect mutation commands, trying to rename column {} to {}, "
"but part {} already has column {}",
renamed_columns_to_from[it->name], it->name, source_part->name, it->name);
ErrorCodes::LOGICAL_ERROR,
"Incorrect mutation commands, trying to rename column {} to {}, "
"but part {} already has column {}",
renamed_columns_to_from[it->name], it->name, source_part->name, it->name);
/// Column was renamed and no other column renamed to it's name
/// or column is dropped.
if (!renamed_columns_to_from.contains(it->name) && (was_renamed || was_removed))
{
it = storage_columns.erase(it);
}
else
{
/// Take a type from source part column.
/// It may differ from column type in storage.
it->type = source_col->second;
if (was_removed)
{ /// DROP COLUMN xxx, RENAME COLUMN yyy TO xxx
auto renamed_from = renamed_columns_to_from.at(it->name);
auto maybe_name_and_type = source_columns.tryGetByName(renamed_from);
if (!maybe_name_and_type)
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Got incorrect mutation commands, column {} was renamed from {}, but it doesn't exist in source columns {}",
it->name, renamed_from, source_columns.toString());
it->type = maybe_name_and_type->type;
}
else
{
/// Take a type from source part column.
/// It may differ from column type in storage.
it->type = source_col->second;
}
++it;
}
}
@ -548,27 +639,36 @@ static NameToNameVector collectFilesForRenames(
/// Collect counts for shared streams of different columns. As an example, Nested columns have shared stream with array sizes.
auto stream_counts = getStreamCounts(source_part, source_part->getColumns().getNames());
NameToNameVector rename_vector;
NameSet collected_names;
auto add_rename = [&rename_vector, &collected_names] (const std::string & file_rename_from, const std::string & file_rename_to)
{
if (collected_names.emplace(file_rename_from).second)
rename_vector.emplace_back(file_rename_from, file_rename_to);
};
MutationCommands squashed_commands = squashRenamesInCommands(commands_for_removes);
/// Remove old data
for (const auto & command : commands_for_removes)
for (const auto & command : squashed_commands)
{
if (command.type == MutationCommand::Type::DROP_INDEX)
{
if (source_part->checksums.has(INDEX_FILE_PREFIX + command.column_name + ".idx2"))
{
rename_vector.emplace_back(INDEX_FILE_PREFIX + command.column_name + ".idx2", "");
rename_vector.emplace_back(INDEX_FILE_PREFIX + command.column_name + mrk_extension, "");
add_rename(INDEX_FILE_PREFIX + command.column_name + ".idx2", "");
add_rename(INDEX_FILE_PREFIX + command.column_name + mrk_extension, "");
}
else if (source_part->checksums.has(INDEX_FILE_PREFIX + command.column_name + ".idx"))
{
rename_vector.emplace_back(INDEX_FILE_PREFIX + command.column_name + ".idx", "");
rename_vector.emplace_back(INDEX_FILE_PREFIX + command.column_name + mrk_extension, "");
add_rename(INDEX_FILE_PREFIX + command.column_name + ".idx", "");
add_rename(INDEX_FILE_PREFIX + command.column_name + mrk_extension, "");
}
}
else if (command.type == MutationCommand::Type::DROP_PROJECTION)
{
if (source_part->checksums.has(command.column_name + ".proj"))
rename_vector.emplace_back(command.column_name + ".proj", "");
add_rename(command.column_name + ".proj", "");
}
else if (command.type == MutationCommand::Type::DROP_COLUMN)
{
@ -578,8 +678,8 @@ static NameToNameVector collectFilesForRenames(
/// Delete files if they are no longer shared with another column.
if (--stream_counts[stream_name] == 0)
{
rename_vector.emplace_back(stream_name + ".bin", "");
rename_vector.emplace_back(stream_name + mrk_extension, "");
add_rename(stream_name + ".bin", "");
add_rename(stream_name + mrk_extension, "");
}
};
@ -598,8 +698,8 @@ static NameToNameVector collectFilesForRenames(
if (stream_from != stream_to)
{
rename_vector.emplace_back(stream_from + ".bin", stream_to + ".bin");
rename_vector.emplace_back(stream_from + mrk_extension, stream_to + mrk_extension);
add_rename(stream_from + ".bin", stream_to + ".bin");
add_rename(stream_from + mrk_extension, stream_to + mrk_extension);
}
};
@ -619,8 +719,8 @@ static NameToNameVector collectFilesForRenames(
{
if (!new_streams.contains(old_stream) && --stream_counts[old_stream] == 0)
{
rename_vector.emplace_back(old_stream + ".bin", "");
rename_vector.emplace_back(old_stream + mrk_extension, "");
add_rename(old_stream + ".bin", "");
add_rename(old_stream + mrk_extension, "");
}
}
}
@ -637,6 +737,7 @@ void finalizeMutatedPart(
ExecuteTTLType execute_ttl_type,
const CompressionCodecPtr & codec,
ContextPtr context,
StorageMetadataPtr metadata_snapshot,
bool sync)
{
std::vector<std::unique_ptr<WriteBufferFromFileBase>> written_files;
@ -685,6 +786,12 @@ void finalizeMutatedPart(
written_files.push_back(std::move(out_comp));
}
{
auto out_metadata = new_data_part->getDataPartStorage().writeFile(IMergeTreeDataPart::METADATA_VERSION_FILE_NAME, 4096, context->getWriteSettings());
DB::writeText(metadata_snapshot->getMetadataVersion(), *out_metadata);
written_files.push_back(std::move(out_metadata));
}
{
/// Write a file with a description of columns.
auto out_columns = new_data_part->getDataPartStorage().writeFile("columns.txt", 4096, context->getWriteSettings());
@ -1324,13 +1431,27 @@ private:
ctx->new_data_part->storeVersionMetadata();
NameSet hardlinked_files;
/// NOTE: Renames must be done in order
for (const auto & [rename_from, rename_to] : ctx->files_to_rename)
{
if (rename_to.empty()) /// It's DROP COLUMN
{
/// pass
}
else
{
ctx->new_data_part->getDataPartStorage().createHardLinkFrom(
ctx->source_part->getDataPartStorage(), rename_from, rename_to);
hardlinked_files.insert(rename_from);
}
}
/// Create hardlinks for unchanged files
for (auto it = ctx->source_part->getDataPartStorage().iterate(); it->isValid(); it->next())
{
if (ctx->files_to_skip.contains(it->name()))
continue;
String destination;
String file_name = it->name();
auto rename_it = std::find_if(ctx->files_to_rename.begin(), ctx->files_to_rename.end(), [&file_name](const auto & rename_pair)
@ -1340,20 +1461,17 @@ private:
if (rename_it != ctx->files_to_rename.end())
{
if (rename_it->second.empty())
continue;
destination = rename_it->second;
}
else
{
destination = it->name();
/// RENAMEs and DROPs already processed
continue;
}
String destination = it->name();
if (it->isFile())
{
ctx->new_data_part->getDataPartStorage().createHardLinkFrom(
ctx->source_part->getDataPartStorage(), it->name(), destination);
hardlinked_files.insert(it->name());
ctx->source_part->getDataPartStorage(), file_name, destination);
hardlinked_files.insert(file_name);
}
else if (!endsWith(it->name(), ".tmp_proj")) // ignore projection tmp merge dir
{
@ -1449,7 +1567,7 @@ private:
}
}
MutationHelpers::finalizeMutatedPart(ctx->source_part, ctx->new_data_part, ctx->execute_ttl_type, ctx->compression_codec, ctx->context, ctx->need_sync);
MutationHelpers::finalizeMutatedPart(ctx->source_part, ctx->new_data_part, ctx->execute_ttl_type, ctx->compression_codec, ctx->context, ctx->metadata_snapshot, ctx->need_sync);
}
@ -1611,7 +1729,7 @@ bool MutateTask::prepare()
context_for_reading->setSetting("allow_asynchronous_read_from_io_pool_for_merge_tree", false);
context_for_reading->setSetting("max_streams_for_merge_tree_reading", Field(0));
MutationHelpers::splitMutationCommands(ctx->source_part, ctx->commands_for_part, ctx->for_interpreter, ctx->for_file_renames);
MutationHelpers::splitAndModifyMutationCommands(ctx->source_part, ctx->commands_for_part, ctx->for_interpreter, ctx->for_file_renames);
ctx->stage_progress = std::make_unique<MergeStageProgress>(1.0);
@ -1656,9 +1774,9 @@ bool MutateTask::prepare()
auto [new_columns, new_infos] = MutationHelpers::getColumnsForNewDataPart(
ctx->source_part, ctx->updated_header, ctx->storage_columns,
ctx->source_part->getSerializationInfos(), ctx->commands_for_part);
ctx->source_part->getSerializationInfos(), ctx->for_interpreter, ctx->for_file_renames);
ctx->new_data_part->setColumns(new_columns, new_infos);
ctx->new_data_part->setColumns(new_columns, new_infos, ctx->metadata_snapshot->getMetadataVersion());
ctx->new_data_part->partition.assign(ctx->source_part->partition);
/// Don't change granularity type while mutating subset of columns

View File

@ -149,7 +149,7 @@ void ReplicatedMergeTreeAttachThread::runImpl()
const bool replica_metadata_version_exists = zookeeper->tryGet(replica_path + "/metadata_version", replica_metadata_version);
if (replica_metadata_version_exists)
{
storage.metadata_version = parse<int>(replica_metadata_version);
storage.setInMemoryMetadata(metadata_snapshot->withMetadataVersion(parse<int>(replica_metadata_version)));
}
else
{

View File

@ -11,6 +11,7 @@
#include <Parsers/formatAST.h>
#include <base/sort.h>
#include <ranges>
namespace DB
{
@ -1754,19 +1755,40 @@ ReplicatedMergeTreeMergePredicate ReplicatedMergeTreeQueue::getMergePredicate(zk
}
MutationCommands ReplicatedMergeTreeQueue::getFirstAlterMutationCommandsForPart(const MergeTreeData::DataPartPtr & part) const
std::map<int64_t, MutationCommands> ReplicatedMergeTreeQueue::getAlterMutationCommandsForPart(const MergeTreeData::DataPartPtr & part) const
{
std::lock_guard lock(state_mutex);
std::unique_lock lock(state_mutex);
auto in_partition = mutations_by_partition.find(part->info.partition_id);
if (in_partition == mutations_by_partition.end())
return MutationCommands{};
return {};
Int64 part_version = part->info.getDataVersion();
for (auto [mutation_version, mutation_status] : in_partition->second)
if (mutation_version > part_version && mutation_status->entry->alter_version != -1)
return mutation_status->entry->commands;
Int64 part_metadata_version = part->getMetadataVersion();
std::map<int64_t, MutationCommands> result;
/// Here we return mutation commands for part which has bigger alter version than part metadata version.
/// Please note, we don't use getDataVersion(). It's because these alter commands are used for in-fly conversions
/// of part's metadata.
for (const auto & [mutation_version, mutation_status] : in_partition->second | std::views::reverse)
{
int32_t alter_version = mutation_status->entry->alter_version;
if (alter_version != -1)
{
if (!alter_sequence.canExecuteDataAlter(alter_version, lock))
continue;
return MutationCommands{};
/// we take commands with bigger metadata version
if (alter_version > part_metadata_version)
{
result[mutation_version] = mutation_status->entry->commands;
}
else
{
/// entries are ordered, we processing them in reverse order so we can break
break;
}
}
}
return result;
}
MutationCommands ReplicatedMergeTreeQueue::getMutationCommands(
@ -1808,7 +1830,18 @@ MutationCommands ReplicatedMergeTreeQueue::getMutationCommands(
MutationCommands commands;
for (auto it = begin; it != end; ++it)
commands.insert(commands.end(), it->second->entry->commands.begin(), it->second->entry->commands.end());
{
const auto & commands_from_entry = it->second->entry->commands;
if (commands_from_entry.containBarrierCommand())
{
if (commands.empty())
commands.insert(commands.end(), commands_from_entry.begin(), commands_from_entry.end());
break;
}
else
commands.insert(commands.end(), commands_from_entry.begin(), commands_from_entry.end());
}
return commands;
}

View File

@ -393,10 +393,10 @@ public:
MutationCommands getMutationCommands(const MergeTreeData::DataPartPtr & part, Int64 desired_mutation_version) const;
/// Return mutation commands for part with smallest mutation version bigger
/// than data part version. Used when we apply alter commands on fly,
/// Return mutation commands for part which could be not applied to
/// it according to part mutation version. Used when we apply alter commands on fly,
/// without actual data modification on disk.
MutationCommands getFirstAlterMutationCommandsForPart(const MergeTreeData::DataPartPtr & part) const;
std::map<int64_t, MutationCommands> getAlterMutationCommandsForPart(const MergeTreeData::DataPartPtr & part) const;
/// Mark finished mutations as done. If the function needs to be called again at some later time
/// (because some mutations are probably done but we are not sure yet), returns true.

View File

@ -23,6 +23,12 @@ namespace ErrorCodes
extern const int MULTIPLE_ASSIGNMENTS_TO_COLUMN;
}
bool MutationCommand::isBarrierCommand() const
{
return type == RENAME_COLUMN;
}
std::optional<MutationCommand> MutationCommand::parse(ASTAlterCommand * command, bool parse_alter_commands)
{
if (command->type == ASTAlterCommand::DELETE)
@ -212,4 +218,14 @@ bool MutationCommands::hasNonEmptyMutationCommands() const
return false;
}
bool MutationCommands::containBarrierCommand() const
{
for (const auto & command : *this)
{
if (command.isBarrierCommand())
return true;
}
return false;
}
}

View File

@ -67,6 +67,9 @@ struct MutationCommand
/// If parse_alter_commands, than consider more Alter commands as mutation commands
static std::optional<MutationCommand> parse(ASTAlterCommand * command, bool parse_alter_commands = false);
/// This command shouldn't stick with other commands
bool isBarrierCommand() const;
};
/// Multiple mutation commands, possible from different ALTER queries
@ -79,6 +82,11 @@ public:
void readText(ReadBuffer & in);
std::string toString() const;
bool hasNonEmptyMutationCommands() const;
/// These set of commands contain barrier command and shouldn't
/// stick with other commands. Commands from one set have already been validated
/// to be executed without issues on the creation state.
bool containBarrierCommand() const;
};
using MutationCommandsConstPtr = std::shared_ptr<MutationCommands>;

View File

@ -41,6 +41,7 @@ StorageInMemoryMetadata::StorageInMemoryMetadata(const StorageInMemoryMetadata &
, settings_changes(other.settings_changes ? other.settings_changes->clone() : nullptr)
, select(other.select)
, comment(other.comment)
, metadata_version(other.metadata_version)
{
}
@ -69,6 +70,7 @@ StorageInMemoryMetadata & StorageInMemoryMetadata::operator=(const StorageInMemo
settings_changes.reset();
select = other.select;
comment = other.comment;
metadata_version = other.metadata_version;
return *this;
}
@ -122,6 +124,18 @@ void StorageInMemoryMetadata::setSelectQuery(const SelectQueryDescription & sele
select = select_;
}
void StorageInMemoryMetadata::setMetadataVersion(int32_t metadata_version_)
{
metadata_version = metadata_version_;
}
StorageInMemoryMetadata StorageInMemoryMetadata::withMetadataVersion(int32_t metadata_version_) const
{
StorageInMemoryMetadata copy(*this);
copy.setMetadataVersion(metadata_version_);
return copy;
}
const ColumnsDescription & StorageInMemoryMetadata::getColumns() const
{
return columns;

View File

@ -50,6 +50,10 @@ struct StorageInMemoryMetadata
String comment;
/// Version of metadata. Managed properly by ReplicatedMergeTree only
/// (zero-initialization is important)
int32_t metadata_version = 0;
StorageInMemoryMetadata() = default;
StorageInMemoryMetadata(const StorageInMemoryMetadata & other);
@ -58,7 +62,7 @@ struct StorageInMemoryMetadata
StorageInMemoryMetadata(StorageInMemoryMetadata && other) = default;
StorageInMemoryMetadata & operator=(StorageInMemoryMetadata && other) = default;
/// NOTE: Thread unsafe part. You should modify same StorageInMemoryMetadata
/// NOTE: Thread unsafe part. You should not modify same StorageInMemoryMetadata
/// structure from different threads. It should be used as MultiVersion
/// object. See example in IStorage.
@ -90,6 +94,11 @@ struct StorageInMemoryMetadata
/// Set SELECT query for (Materialized)View
void setSelectQuery(const SelectQueryDescription & select_);
/// Set version of metadata.
void setMetadataVersion(int32_t metadata_version_);
/// Get copy of current metadata with metadata_version_
StorageInMemoryMetadata withMetadataVersion(int32_t metadata_version_) const;
/// Returns combined set of columns
const ColumnsDescription & getColumns() const;
@ -218,6 +227,9 @@ struct StorageInMemoryMetadata
const SelectQueryDescription & getSelectQuery() const;
bool hasSelectQuery() const;
/// Get version of metadata
int32_t getMetadataVersion() const { return metadata_version; }
/// Check that all the requested names are in the table and have the correct types.
void check(const NamesAndTypesList & columns) const;

View File

@ -1143,9 +1143,24 @@ MergeMutateSelectedEntryPtr StorageMergeTree::selectPartsToMutate(
if (current_ast_elements + commands_size >= max_ast_elements)
break;
current_ast_elements += commands_size;
commands->insert(commands->end(), it->second.commands.begin(), it->second.commands.end());
last_mutation_to_apply = it;
const auto & single_mutation_commands = it->second.commands;
if (single_mutation_commands.containBarrierCommand())
{
if (commands->empty())
{
commands->insert(commands->end(), single_mutation_commands.begin(), single_mutation_commands.end());
last_mutation_to_apply = it;
}
break;
}
else
{
current_ast_elements += commands_size;
commands->insert(commands->end(), single_mutation_commands.begin(), single_mutation_commands.end());
last_mutation_to_apply = it;
}
}
assert(commands->empty() == (last_mutation_to_apply == mutations_end_it));
@ -1240,7 +1255,10 @@ bool StorageMergeTree::scheduleDataProcessingJob(BackgroundJobsAssignee & assign
}
if (mutate_entry)
{
auto task = std::make_shared<MutatePlainMergeTreeTask>(*this, metadata_snapshot, mutate_entry, shared_lock, common_assignee_trigger);
/// We take new metadata snapshot here. It's because mutation commands can be executed only with metadata snapshot
/// which is equal or more fresh than commands themselves. In extremely rare case it can happen that we will have alter
/// in between we took snapshot above and selected commands. That is why we take new snapshot here.
auto task = std::make_shared<MutatePlainMergeTreeTask>(*this, getInMemoryMetadataPtr(), mutate_entry, shared_lock, common_assignee_trigger);
assignee.scheduleMergeMutateTask(task);
return true;
}
@ -2109,14 +2127,22 @@ void StorageMergeTree::attachRestoredParts(MutableDataPartsVector && parts)
}
MutationCommands StorageMergeTree::getFirstAlterMutationCommandsForPart(const DataPartPtr & part) const
std::map<int64_t, MutationCommands> StorageMergeTree::getAlterMutationCommandsForPart(const DataPartPtr & part) const
{
std::lock_guard lock(currently_processing_in_background_mutex);
auto it = current_mutations_by_version.upper_bound(part->info.getDataVersion());
if (it == current_mutations_by_version.end())
return {};
return it->second.commands;
Int64 part_data_version = part->info.getDataVersion();
std::map<int64_t, MutationCommands> result;
if (!current_mutations_by_version.empty())
{
const auto & [latest_mutation_id, latest_commands] = *current_mutations_by_version.rbegin();
if (part_data_version < static_cast<int64_t>(latest_mutation_id))
{
result[latest_mutation_id] = latest_commands.commands;
}
}
return result;
}
void StorageMergeTree::startBackgroundMovesIfNeeded()

View File

@ -267,7 +267,7 @@ private:
protected:
MutationCommands getFirstAlterMutationCommandsForPart(const DataPartPtr & part) const override;
std::map<int64_t, MutationCommands> getAlterMutationCommandsForPart(const DataPartPtr & part) const override;
};
}

View File

@ -461,7 +461,7 @@ StorageReplicatedMergeTree::StorageReplicatedMergeTree(
Coordination::Stat metadata_stat;
current_zookeeper->get(zookeeper_path + "/metadata", &metadata_stat);
metadata_version = metadata_stat.version;
setInMemoryMetadata(metadata_snapshot->withMetadataVersion(metadata_stat.version));
}
catch (Coordination::Exception & e)
{
@ -781,7 +781,7 @@ bool StorageReplicatedMergeTree::createTableIfNotExists(const StorageMetadataPtr
zkutil::CreateMode::Persistent));
ops.emplace_back(zkutil::makeCreateRequest(replica_path + "/columns", metadata_snapshot->getColumns().toString(),
zkutil::CreateMode::Persistent));
ops.emplace_back(zkutil::makeCreateRequest(replica_path + "/metadata_version", std::to_string(metadata_version),
ops.emplace_back(zkutil::makeCreateRequest(replica_path + "/metadata_version", toString(metadata_snapshot->getMetadataVersion()),
zkutil::CreateMode::Persistent));
/// The following 3 nodes were added in version 1.1.xxx, so we create them here, not in createNewZooKeeperNodes()
@ -854,7 +854,7 @@ void StorageReplicatedMergeTree::createReplica(const StorageMetadataPtr & metada
zkutil::CreateMode::Persistent));
ops.emplace_back(zkutil::makeCreateRequest(replica_path + "/columns", metadata_snapshot->getColumns().toString(),
zkutil::CreateMode::Persistent));
ops.emplace_back(zkutil::makeCreateRequest(replica_path + "/metadata_version", std::to_string(metadata_version),
ops.emplace_back(zkutil::makeCreateRequest(replica_path + "/metadata_version", toString(metadata_snapshot->getMetadataVersion()),
zkutil::CreateMode::Persistent));
/// The following 3 nodes were added in version 1.1.xxx, so we create them here, not in createNewZooKeeperNodes()
@ -1159,16 +1159,19 @@ void StorageReplicatedMergeTree::checkTableStructure(const String & zookeeper_pr
}
void StorageReplicatedMergeTree::setTableStructure(const StorageID & table_id, const ContextPtr & local_context,
ColumnsDescription new_columns, const ReplicatedMergeTreeTableMetadata::Diff & metadata_diff)
ColumnsDescription new_columns, const ReplicatedMergeTreeTableMetadata::Diff & metadata_diff, int32_t new_metadata_version)
{
StorageInMemoryMetadata old_metadata = getInMemoryMetadata();
StorageInMemoryMetadata new_metadata = metadata_diff.getNewMetadata(new_columns, local_context, old_metadata);
new_metadata.setMetadataVersion(new_metadata_version);
/// Even if the primary/sorting/partition keys didn't change we must reinitialize it
/// because primary/partition key column types might have changed.
checkTTLExpressions(new_metadata, old_metadata);
setProperties(new_metadata, old_metadata);
DatabaseCatalog::instance().getDatabase(table_id.database_name)->alterTable(local_context, table_id, new_metadata);
}
@ -2782,8 +2785,9 @@ void StorageReplicatedMergeTree::cloneMetadataIfNeeded(const String & source_rep
return;
}
auto metadata_snapshot = getInMemoryMetadataPtr();
Int32 source_metadata_version = parse<Int32>(source_metadata_version_str);
if (metadata_version == source_metadata_version)
if (metadata_snapshot->getMetadataVersion() == source_metadata_version)
return;
/// Our metadata it not up to date with source replica metadata.
@ -2801,7 +2805,7 @@ void StorageReplicatedMergeTree::cloneMetadataIfNeeded(const String & source_rep
/// if all such entries were cleaned up from the log and source_queue.
LOG_WARNING(log, "Metadata version ({}) on replica is not up to date with metadata ({}) on source replica {}",
metadata_version, source_metadata_version, source_replica);
metadata_snapshot->getMetadataVersion(), source_metadata_version, source_replica);
String source_metadata;
String source_columns;
@ -4980,14 +4984,15 @@ bool StorageReplicatedMergeTree::optimize(
bool StorageReplicatedMergeTree::executeMetadataAlter(const StorageReplicatedMergeTree::LogEntry & entry)
{
if (entry.alter_version < metadata_version)
auto current_metadata = getInMemoryMetadataPtr();
if (entry.alter_version < current_metadata->getMetadataVersion())
{
/// TODO Can we replace it with LOGICAL_ERROR?
/// As for now, it may rarely happen due to reordering of ALTER_METADATA entries in the queue of
/// non-initial replica and also may happen after stale replica recovery.
LOG_WARNING(log, "Attempt to update metadata of version {} "
"to older version {} when processing log entry {}: {}",
metadata_version, entry.alter_version, entry.znode_name, entry.toString());
current_metadata->getMetadataVersion(), entry.alter_version, entry.znode_name, entry.toString());
return true;
}
@ -5035,10 +5040,10 @@ bool StorageReplicatedMergeTree::executeMetadataAlter(const StorageReplicatedMer
LOG_INFO(log, "Metadata changed in ZooKeeper. Applying changes locally.");
auto metadata_diff = ReplicatedMergeTreeTableMetadata(*this, getInMemoryMetadataPtr()).checkAndFindDiff(metadata_from_entry, getInMemoryMetadataPtr()->getColumns(), getContext());
setTableStructure(table_id, alter_context, std::move(columns_from_entry), metadata_diff);
metadata_version = entry.alter_version;
setTableStructure(table_id, alter_context, std::move(columns_from_entry), metadata_diff, entry.alter_version);
LOG_INFO(log, "Applied changes to the metadata of the table. Current metadata version: {}", metadata_version);
current_metadata = getInMemoryMetadataPtr();
LOG_INFO(log, "Applied changes to the metadata of the table. Current metadata version: {}", current_metadata->getMetadataVersion());
}
{
@ -5050,7 +5055,7 @@ bool StorageReplicatedMergeTree::executeMetadataAlter(const StorageReplicatedMer
/// This transaction may not happen, but it's OK, because on the next retry we will eventually create/update this node
/// TODO Maybe do in in one transaction for Replicated database?
zookeeper->createOrUpdate(fs::path(replica_path) / "metadata_version", std::to_string(metadata_version), zkutil::CreateMode::Persistent);
zookeeper->createOrUpdate(fs::path(replica_path) / "metadata_version", std::to_string(current_metadata->getMetadataVersion()), zkutil::CreateMode::Persistent);
return true;
}
@ -5174,7 +5179,7 @@ void StorageReplicatedMergeTree::alter(
size_t mutation_path_idx = std::numeric_limits<size_t>::max();
String new_metadata_str = future_metadata_in_zk.toString();
ops.emplace_back(zkutil::makeSetRequest(fs::path(zookeeper_path) / "metadata", new_metadata_str, metadata_version));
ops.emplace_back(zkutil::makeSetRequest(fs::path(zookeeper_path) / "metadata", new_metadata_str, current_metadata->getMetadataVersion()));
String new_columns_str = future_metadata.columns.toString();
ops.emplace_back(zkutil::makeSetRequest(fs::path(zookeeper_path) / "columns", new_columns_str, -1));
@ -5190,7 +5195,7 @@ void StorageReplicatedMergeTree::alter(
/// We can be sure, that in case of successful commit in zookeeper our
/// version will increments by 1. Because we update with version check.
int new_metadata_version = metadata_version + 1;
int new_metadata_version = current_metadata->getMetadataVersion() + 1;
alter_entry->type = LogEntry::ALTER_METADATA;
alter_entry->source_replica = replica_name;
@ -7964,9 +7969,9 @@ bool StorageReplicatedMergeTree::canUseAdaptiveGranularity() const
}
MutationCommands StorageReplicatedMergeTree::getFirstAlterMutationCommandsForPart(const DataPartPtr & part) const
std::map<int64_t, MutationCommands> StorageReplicatedMergeTree::getAlterMutationCommandsForPart(const DataPartPtr & part) const
{
return queue.getFirstAlterMutationCommandsForPart(part);
return queue.getAlterMutationCommandsForPart(part);
}

View File

@ -215,8 +215,6 @@ public:
/// It's used if not set in engine's arguments while creating a replicated table.
static String getDefaultReplicaName(const ContextPtr & context_);
int getMetadataVersion() const { return metadata_version; }
/// Modify a CREATE TABLE query to make a variant which must be written to a backup.
void adjustCreateQueryForBackup(ASTPtr & create_query) const override;
@ -428,7 +426,6 @@ private:
std::atomic<bool> shutdown_called {false};
std::atomic<bool> flush_called {false};
int metadata_version = 0;
/// Threads.
/// A task that keeps track of the updates in the logs of all replicas and loads them into the queue.
@ -505,8 +502,10 @@ private:
/// A part of ALTER: apply metadata changes only (data parts are altered separately).
/// Must be called under IStorage::lockForAlter() lock.
void setTableStructure(const StorageID & table_id, const ContextPtr & local_context,
ColumnsDescription new_columns, const ReplicatedMergeTreeTableMetadata::Diff & metadata_diff);
void setTableStructure(
const StorageID & table_id, const ContextPtr & local_context,
ColumnsDescription new_columns, const ReplicatedMergeTreeTableMetadata::Diff & metadata_diff,
int32_t new_metadata_version);
/** Check that the set of parts corresponds to that in ZK (/replicas/me/parts/).
* If any parts described in ZK are not locally, throw an exception.
@ -833,7 +832,7 @@ private:
void waitMutationToFinishOnReplicas(
const Strings & replicas, const String & mutation_id) const;
MutationCommands getFirstAlterMutationCommandsForPart(const DataPartPtr & part) const override;
std::map<int64_t, MutationCommands> getAlterMutationCommandsForPart(const DataPartPtr & part) const override;
void startBackgroundMovesIfNeeded() override;

View File

@ -43,8 +43,18 @@ def create_table(cluster, table_name, additional_settings=None):
FILES_OVERHEAD = 1
FILES_OVERHEAD_PER_COLUMN = 2 # Data and mark files
FILES_OVERHEAD_PER_PART_WIDE = FILES_OVERHEAD_PER_COLUMN * 3 + 2 + 6 + 1
FILES_OVERHEAD_PER_PART_COMPACT = 10 + 1
FILES_OVERHEAD_DEFAULT_COMPRESSION_CODEC = 1
FILES_OVERHEAD_METADATA_VERSION = 1
FILES_OVERHEAD_PER_PART_WIDE = (
FILES_OVERHEAD_PER_COLUMN * 3
+ 2
+ 6
+ FILES_OVERHEAD_DEFAULT_COMPRESSION_CODEC
+ FILES_OVERHEAD_METADATA_VERSION
)
FILES_OVERHEAD_PER_PART_COMPACT = (
10 + FILES_OVERHEAD_DEFAULT_COMPRESSION_CODEC + FILES_OVERHEAD_METADATA_VERSION
)
@pytest.fixture(scope="module")

View File

@ -52,8 +52,18 @@ def cluster():
FILES_OVERHEAD = 1
FILES_OVERHEAD_PER_COLUMN = 2 # Data and mark files
FILES_OVERHEAD_PER_PART_WIDE = FILES_OVERHEAD_PER_COLUMN * 3 + 2 + 6 + 1
FILES_OVERHEAD_PER_PART_COMPACT = 10 + 1
FILES_OVERHEAD_DEFAULT_COMPRESSION_CODEC = 1
FILES_OVERHEAD_METADATA_VERSION = 1
FILES_OVERHEAD_PER_PART_WIDE = (
FILES_OVERHEAD_PER_COLUMN * 3
+ 2
+ 6
+ FILES_OVERHEAD_DEFAULT_COMPRESSION_CODEC
+ FILES_OVERHEAD_METADATA_VERSION
)
FILES_OVERHEAD_PER_PART_COMPACT = (
10 + FILES_OVERHEAD_DEFAULT_COMPRESSION_CODEC + FILES_OVERHEAD_METADATA_VERSION
)
def create_table(node, table_name, **additional_settings):
@ -232,7 +242,6 @@ def test_insert_same_partition_and_merge(cluster, merge_vertical, node_name):
def test_alter_table_columns(cluster, node_name):
node = cluster.instances[node_name]
create_table(node, "s3_test")
minio = cluster.minio_client
node.query(
"INSERT INTO s3_test VALUES {}".format(generate_values("2020-01-03", 4096))

View File

@ -89,7 +89,7 @@ def drop_table(cluster):
# S3 request will be failed for an appropriate part file write.
FILES_PER_PART_BASE = 5 # partition.dat, default_compression_codec.txt, count.txt, columns.txt, checksums.txt
FILES_PER_PART_BASE = 6 # partition.dat, metadata_version.txt, default_compression_codec.txt, count.txt, columns.txt, checksums.txt
FILES_PER_PART_WIDE = (
FILES_PER_PART_BASE + 1 + 1 + 3 * 2
) # Primary index, MinMax, Mark and data file for column(s)

View File

@ -105,6 +105,8 @@ def partition_complex_assert_checksums():
"c4ca4238a0b923820dcc509a6f75849b\tshadow/1/data/test/partition_complex/19700102_2_2_0/count.txt\n"
"c4ca4238a0b923820dcc509a6f75849b\tshadow/1/data/test/partition_complex/19700201_1_1_0/count.txt\n"
"cfcb770c3ecd0990dcceb1bde129e6c6\tshadow/1/data/test/partition_complex/19700102_2_2_0/p.bin\n"
"cfcd208495d565ef66e7dff9f98764da\tshadow/1/data/test/partition_complex/19700102_2_2_0/metadata_version.txt\n"
"cfcd208495d565ef66e7dff9f98764da\tshadow/1/data/test/partition_complex/19700201_1_1_0/metadata_version.txt\n"
"e2af3bef1fd129aea73a890ede1e7a30\tshadow/1/data/test/partition_complex/19700201_1_1_0/k.bin\n"
"f2312862cc01adf34a93151377be2ddf\tshadow/1/data/test/partition_complex/19700201_1_1_0/minmax_p.idx\n"
)

View File

@ -44,8 +44,18 @@ def cluster():
FILES_OVERHEAD = 1
FILES_OVERHEAD_PER_COLUMN = 2 # Data and mark files
FILES_OVERHEAD_PER_PART_WIDE = FILES_OVERHEAD_PER_COLUMN * 3 + 2 + 6 + 1
FILES_OVERHEAD_PER_PART_COMPACT = 10 + 1
FILES_OVERHEAD_DEFAULT_COMPRESSION_CODEC = 1
FILES_OVERHEAD_METADATA_VERSION = 1
FILES_OVERHEAD_PER_PART_WIDE = (
FILES_OVERHEAD_PER_COLUMN * 3
+ 2
+ 6
+ FILES_OVERHEAD_DEFAULT_COMPRESSION_CODEC
+ FILES_OVERHEAD_METADATA_VERSION
)
FILES_OVERHEAD_PER_PART_COMPACT = (
10 + FILES_OVERHEAD_DEFAULT_COMPRESSION_CODEC + FILES_OVERHEAD_METADATA_VERSION
)
def random_string(length):

View File

@ -47,8 +47,18 @@ def cluster():
FILES_OVERHEAD = 1
FILES_OVERHEAD_PER_COLUMN = 2 # Data and mark files
FILES_OVERHEAD_PER_PART_WIDE = FILES_OVERHEAD_PER_COLUMN * 3 + 2 + 6 + 1
FILES_OVERHEAD_PER_PART_COMPACT = 10 + 1
FILES_OVERHEAD_DEFAULT_COMPRESSION_CODEC = 1
FILES_OVERHEAD_METADATA_VERSION = 1
FILES_OVERHEAD_PER_PART_WIDE = (
FILES_OVERHEAD_PER_COLUMN * 3
+ 2
+ 6
+ FILES_OVERHEAD_DEFAULT_COMPRESSION_CODEC
+ FILES_OVERHEAD_METADATA_VERSION
)
FILES_OVERHEAD_PER_PART_COMPACT = (
10 + FILES_OVERHEAD_DEFAULT_COMPRESSION_CODEC + FILES_OVERHEAD_METADATA_VERSION
)
def random_string(length):

View File

@ -86,9 +86,9 @@ def test_ttl_move_and_s3(started_cluster):
print(f"Total objects: {counter}")
if counter == 300:
if counter == 330:
break
print(f"Attempts remaining: {attempt}")
assert counter == 300
assert counter == 330

View File

@ -1,7 +1,7 @@
CREATE TABLE default.rename_table\n(\n `key` Int32,\n `old_value1` Int32,\n `value1` Int32\n)\nENGINE = MergeTree\nORDER BY tuple()\nSETTINGS index_granularity = 8192
CREATE TABLE default.rename_table\n(\n `key` Int32,\n `old_value1` Int32,\n `value1` Int32\n)\nENGINE = MergeTree\nORDER BY tuple()\nSETTINGS min_bytes_for_wide_part = 0, index_granularity = 8192
key old_value1 value1
1 2 3
CREATE TABLE default.rename_table\n(\n `k` Int32,\n `v1` Int32,\n `v2` Int32\n)\nENGINE = MergeTree\nORDER BY tuple()\nSETTINGS index_granularity = 8192
CREATE TABLE default.rename_table\n(\n `k` Int32,\n `v1` Int32,\n `v2` Int32\n)\nENGINE = MergeTree\nORDER BY tuple()\nSETTINGS min_bytes_for_wide_part = 0, index_granularity = 8192
k v1 v2
1 2 3
4 5 6

View File

@ -1,6 +1,6 @@
DROP TABLE IF EXISTS rename_table;
CREATE TABLE rename_table (key Int32, value1 Int32, value2 Int32) ENGINE = MergeTree ORDER BY tuple();
CREATE TABLE rename_table (key Int32, value1 Int32, value2 Int32) ENGINE = MergeTree ORDER BY tuple() SETTINGS min_bytes_for_wide_part=0;
INSERT INTO rename_table VALUES (1, 2, 3);

View File

@ -1,11 +1,11 @@
CREATE TABLE default.rename_table_multiple\n(\n `key` Int32,\n `value1_string` String,\n `value2` Int32\n)\nENGINE = MergeTree\nORDER BY tuple()\nSETTINGS index_granularity = 8192
CREATE TABLE default.rename_table_multiple\n(\n `key` Int32,\n `value1_string` String,\n `value2` Int32\n)\nENGINE = MergeTree\nORDER BY tuple()\nSETTINGS min_bytes_for_wide_part = 0, index_granularity = 8192
key value1_string value2
1 2 3
CREATE TABLE default.rename_table_multiple\n(\n `key` Int32,\n `value1_string` String,\n `value2_old` Int32,\n `value2` Int64 DEFAULT 7\n)\nENGINE = MergeTree\nORDER BY tuple()\nSETTINGS index_granularity = 8192
CREATE TABLE default.rename_table_multiple\n(\n `key` Int32,\n `value1_string` String,\n `value2_old` Int32,\n `value2` Int64 DEFAULT 7\n)\nENGINE = MergeTree\nORDER BY tuple()\nSETTINGS min_bytes_for_wide_part = 0, index_granularity = 8192
key value1_string value2_old value2
1 2 3 7
4 5 6 7
CREATE TABLE default.rename_table_multiple\n(\n `key` Int32,\n `value1_string` String,\n `value2_old` Int64 DEFAULT 7\n)\nENGINE = MergeTree\nORDER BY tuple()\nSETTINGS index_granularity = 8192
CREATE TABLE default.rename_table_multiple\n(\n `key` Int32,\n `value1_string` String,\n `value2_old` Int64 DEFAULT 7\n)\nENGINE = MergeTree\nORDER BY tuple()\nSETTINGS min_bytes_for_wide_part = 0, index_granularity = 8192
key value1_string value2_old
1 2 7
4 5 7

View File

@ -1,6 +1,6 @@
DROP TABLE IF EXISTS rename_table_multiple;
CREATE TABLE rename_table_multiple (key Int32, value1 String, value2 Int32) ENGINE = MergeTree ORDER BY tuple();
CREATE TABLE rename_table_multiple (key Int32, value1 String, value2 Int32) ENGINE = MergeTree ORDER BY tuple() SETTINGS min_bytes_for_wide_part=0;
INSERT INTO rename_table_multiple VALUES (1, 2, 3);

View File

@ -7,25 +7,25 @@ file_segment_range_begin: 0
file_segment_range_end: 745
size: 746
state: DOWNLOADED
7
7
8
8
0
2
2
7
8
Row 1:
──────
file_segment_range_begin: 0
file_segment_range_end: 1659
size: 1660
state: DOWNLOADED
7
7
7
7
21
31
38
8
8
8
8
24
35
43
5010500
18816
Using storage policy: local_cache
@ -37,24 +37,24 @@ file_segment_range_begin: 0
file_segment_range_end: 745
size: 746
state: DOWNLOADED
7
7
8
8
0
2
2
7
8
Row 1:
──────
file_segment_range_begin: 0
file_segment_range_end: 1659
size: 1660
state: DOWNLOADED
7
7
7
7
21
31
38
8
8
8
8
24
35
43
5010500
18816

View File

@ -44,8 +44,8 @@ for i in {1..100}; do
")"
# Non retriable errors
if [[ $FileSync -ne 7 ]]; then
echo "FileSync: $FileSync != 11" >&2
if [[ $FileSync -ne 8 ]]; then
echo "FileSync: $FileSync != 8" >&2
exit 2
fi
# Check that all files was synced

View File

@ -0,0 +1,8 @@
1 2 3
4 5 6
{"column1_renamed":"1","column2_renamed":"2","column3":"3"}
{"column1_renamed":"4","column2_renamed":"5","column3":"6"}
1 2 3
4 5 6
{"column1_renamed":"1","column2_renamed":"2","column3":"3"}
{"column1_renamed":"4","column2_renamed":"5","column3":"6"}

View File

@ -0,0 +1,59 @@
DROP TABLE IF EXISTS wrong_metadata;
CREATE TABLE wrong_metadata(
column1 UInt64,
column2 UInt64,
column3 UInt64
)
ENGINE ReplicatedMergeTree('/test/{database}/tables/wrong_metadata', '1')
ORDER BY tuple();
INSERT INTO wrong_metadata VALUES (1, 2, 3);
SYSTEM STOP REPLICATION QUEUES wrong_metadata;
ALTER TABLE wrong_metadata RENAME COLUMN column1 TO column1_renamed SETTINGS replication_alter_partitions_sync = 0;
INSERT INTO wrong_metadata VALUES (4, 5, 6);
SELECT * FROM wrong_metadata ORDER BY column1;
SYSTEM START REPLICATION QUEUES wrong_metadata;
SYSTEM SYNC REPLICA wrong_metadata;
ALTER TABLE wrong_metadata RENAME COLUMN column2 to column2_renamed SETTINGS replication_alter_partitions_sync = 2;
SELECT * FROM wrong_metadata ORDER BY column1_renamed FORMAT JSONEachRow;
DROP TABLE IF EXISTS wrong_metadata;
CREATE TABLE wrong_metadata_wide(
column1 UInt64,
column2 UInt64,
column3 UInt64
)
ENGINE ReplicatedMergeTree('/test/{database}/tables/wrong_metadata_wide', '1')
ORDER BY tuple()
SETTINGS min_bytes_for_wide_part = 0;
INSERT INTO wrong_metadata_wide VALUES (1, 2, 3);
SYSTEM STOP REPLICATION QUEUES wrong_metadata_wide;
ALTER TABLE wrong_metadata_wide RENAME COLUMN column1 TO column1_renamed SETTINGS replication_alter_partitions_sync = 0;
INSERT INTO wrong_metadata_wide VALUES (4, 5, 6);
SELECT * FROM wrong_metadata_wide ORDER by column1;
SYSTEM START REPLICATION QUEUES wrong_metadata_wide;
SYSTEM SYNC REPLICA wrong_metadata_wide;
ALTER TABLE wrong_metadata_wide RENAME COLUMN column2 to column2_renamed SETTINGS replication_alter_partitions_sync = 2;
SELECT * FROM wrong_metadata_wide ORDER BY column1_renamed FORMAT JSONEachRow;
DROP TABLE IF EXISTS wrong_metadata_wide;

View File

@ -0,0 +1 @@
{"v":"1","v2":"77"}

View File

@ -0,0 +1,58 @@
#!/usr/bin/env bash
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CURDIR"/../shell_config.sh
$CLICKHOUSE_CLIENT --query="DROP TABLE IF EXISTS table_to_rename"
$CLICKHOUSE_CLIENT --query="CREATE TABLE table_to_rename(v UInt64, v1 UInt64)ENGINE = MergeTree ORDER BY tuple() SETTINGS min_bytes_for_wide_part = 0"
$CLICKHOUSE_CLIENT --query="INSERT INTO table_to_rename VALUES (1, 1)"
# we want to following mutations to stuck
# That is why we stop merges and wait in loops until they actually start
$CLICKHOUSE_CLIENT --query="SYSTEM STOP MERGES table_to_rename"
$CLICKHOUSE_CLIENT --query="ALTER TABLE table_to_rename RENAME COLUMN v1 to v2" &
counter=0 retries=60
I=0
while [[ $counter -lt $retries ]]; do
I=$((I + 1))
result=$($CLICKHOUSE_CLIENT --query "show create table table_to_rename")
if [[ $result == *"v2"* ]]; then
break;
fi
sleep 0.1
((++counter))
done
$CLICKHOUSE_CLIENT --query="ALTER TABLE table_to_rename UPDATE v2 = 77 WHERE 1 = 1 SETTINGS mutations_sync = 2" &
counter=0 retries=60
I=0
while [[ $counter -lt $retries ]]; do
I=$((I + 1))
result=$($CLICKHOUSE_CLIENT --query "SELECT count() from system.mutations where database='${CLICKHOUSE_DATABASE}' and table='table_to_rename'")
if [[ $result == "2" ]]; then
break;
fi
sleep 0.1
((++counter))
done
$CLICKHOUSE_CLIENT --query="SYSTEM START MERGES table_to_rename"
wait
$CLICKHOUSE_CLIENT --query="SELECT * FROM table_to_rename FORMAT JSONEachRow"
$CLICKHOUSE_CLIENT --query="DROP TABLE IF EXISTS table_to_rename"

View File

@ -0,0 +1,26 @@
{"a1":"1","b1":"2","c":"3"}
~~~~~~~
{"a1":"1","b1":"2","c":"3"}
{"a1":"4","b1":"5","c":"6"}
~~~~~~~
{"a1":"1","b1":"2","c":"3"}
{"a1":"4","b1":"5","c":"6"}
{"a1":"7","b1":"8","c":"9"}
~~~~~~~
{"b":"1","a":"2","c":"3"}
{"b":"4","a":"5","c":"6"}
{"b":"7","a":"8","c":"9"}
~~~~~~~
{"a1":"1","b1":"2","c":"3"}
~~~~~~~
{"a1":"1","b1":"2","c":"3"}
{"a1":"4","b1":"5","c":"6"}
~~~~~~~
{"a1":"1","b1":"2","c":"3"}
{"a1":"4","b1":"5","c":"6"}
{"a1":"7","b1":"8","c":"9"}
~~~~~~~
{"b":"1","a":"2","c":"3"}
{"b":"4","a":"5","c":"6"}
{"b":"7","a":"8","c":"9"}
~~~~~~~

View File

@ -0,0 +1,143 @@
#!/usr/bin/env bash
# Tags: replica
CUR_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CUR_DIR"/../shell_config.sh
$CLICKHOUSE_CLIENT --query="DROP TABLE IF EXISTS wrong_metadata"
$CLICKHOUSE_CLIENT -n --query="CREATE TABLE wrong_metadata(
a UInt64,
b UInt64,
c UInt64
)
ENGINE ReplicatedMergeTree('/test/{database}/tables/wrong_metadata', '1')
ORDER BY tuple()
SETTINGS min_bytes_for_wide_part = 0"
$CLICKHOUSE_CLIENT --query="INSERT INTO wrong_metadata VALUES (1, 2, 3)"
$CLICKHOUSE_CLIENT --query="SYSTEM STOP MERGES wrong_metadata"
$CLICKHOUSE_CLIENT --query="ALTER TABLE wrong_metadata RENAME COLUMN a TO a1, RENAME COLUMN b to b1 SETTINGS replication_alter_partitions_sync = 0"
counter=0 retries=60
I=0
while [[ $counter -lt $retries ]]; do
I=$((I + 1))
result=$($CLICKHOUSE_CLIENT --query "SHOW CREATE TABLE wrong_metadata")
if [[ $result == *"\`a1\` UInt64"* ]]; then
break;
fi
sleep 0.1
((++counter))
done
$CLICKHOUSE_CLIENT --query="SELECT * FROM wrong_metadata ORDER BY a1 FORMAT JSONEachRow"
$CLICKHOUSE_CLIENT --query="SELECT '~~~~~~~'"
$CLICKHOUSE_CLIENT --query="INSERT INTO wrong_metadata VALUES (4, 5, 6)"
$CLICKHOUSE_CLIENT --query="SELECT * FROM wrong_metadata ORDER BY a1 FORMAT JSONEachRow"
$CLICKHOUSE_CLIENT --query="SELECT '~~~~~~~'"
$CLICKHOUSE_CLIENT --query="ALTER TABLE wrong_metadata RENAME COLUMN a1 TO b, RENAME COLUMN b1 to a SETTINGS replication_alter_partitions_sync = 0"
counter=0 retries=60
I=0
while [[ $counter -lt $retries ]]; do
I=$((I + 1))
result=$($CLICKHOUSE_CLIENT --query "SELECT * FROM system.mutations WHERE table = 'wrong_metadata' AND database='${CLICKHOUSE_DATABASE}'")
if [[ $result == *"b1 TO a"* ]]; then
break;
fi
sleep 0.1
((++counter))
done
$CLICKHOUSE_CLIENT --query="INSERT INTO wrong_metadata VALUES (7, 8, 9)"
$CLICKHOUSE_CLIENT --query="SELECT * FROM wrong_metadata ORDER by a1 FORMAT JSONEachRow"
$CLICKHOUSE_CLIENT --query="SELECT '~~~~~~~'"
$CLICKHOUSE_CLIENT --query="SYSTEM START MERGES wrong_metadata"
$CLICKHOUSE_CLIENT --query="SYSTEM SYNC REPLICA wrong_metadata"
$CLICKHOUSE_CLIENT --query="SELECT * FROM wrong_metadata order by a FORMAT JSONEachRow"
$CLICKHOUSE_CLIENT --query="SELECT '~~~~~~~'"
$CLICKHOUSE_CLIENT --query="DROP TABLE IF EXISTS wrong_metadata"
$CLICKHOUSE_CLIENT --query="DROP TABLE IF EXISTS wrong_metadata_compact"
$CLICKHOUSE_CLIENT -n --query="CREATE TABLE wrong_metadata_compact(
a UInt64,
b UInt64,
c UInt64
)
ENGINE ReplicatedMergeTree('/test/{database}/tables/wrong_metadata_compact', '1')
ORDER BY tuple()
SETTINGS min_bytes_for_wide_part = 10000000"
$CLICKHOUSE_CLIENT --query="INSERT INTO wrong_metadata_compact VALUES (1, 2, 3)"
$CLICKHOUSE_CLIENT --query="SYSTEM STOP MERGES wrong_metadata_compact"
$CLICKHOUSE_CLIENT --query="ALTER TABLE wrong_metadata_compact RENAME COLUMN a TO a1, RENAME COLUMN b to b1 SETTINGS replication_alter_partitions_sync = 0"
counter=0 retries=60
I=0
while [[ $counter -lt $retries ]]; do
I=$((I + 1))
result=$($CLICKHOUSE_CLIENT --query "SHOW CREATE TABLE wrong_metadata_compact")
if [[ $result == *"\`a1\` UInt64"* ]]; then
break;
fi
sleep 0.1
((++counter))
done
$CLICKHOUSE_CLIENT --query="SELECT * FROM wrong_metadata_compact ORDER BY a1 FORMAT JSONEachRow"
$CLICKHOUSE_CLIENT --query="SELECT '~~~~~~~'"
$CLICKHOUSE_CLIENT --query="INSERT INTO wrong_metadata_compact VALUES (4, 5, 6)"
$CLICKHOUSE_CLIENT --query="SELECT * FROM wrong_metadata_compact ORDER BY a1 FORMAT JSONEachRow"
$CLICKHOUSE_CLIENT --query="SELECT '~~~~~~~'"
$CLICKHOUSE_CLIENT --query="ALTER TABLE wrong_metadata_compact RENAME COLUMN a1 TO b, RENAME COLUMN b1 to a SETTINGS replication_alter_partitions_sync = 0"
counter=0 retries=60
I=0
while [[ $counter -lt $retries ]]; do
I=$((I + 1))
result=$($CLICKHOUSE_CLIENT --query "SELECT * FROM system.mutations WHERE table = 'wrong_metadata_compact' AND database='${CLICKHOUSE_DATABASE}'")
if [[ $result == *"b1 TO a"* ]]; then
break;
fi
sleep 0.1
((++counter))
done
$CLICKHOUSE_CLIENT --query="INSERT INTO wrong_metadata_compact VALUES (7, 8, 9)"
$CLICKHOUSE_CLIENT --query="SELECT * FROM wrong_metadata_compact ORDER by a1 FORMAT JSONEachRow"
$CLICKHOUSE_CLIENT --query="SELECT '~~~~~~~'"
$CLICKHOUSE_CLIENT --query="SYSTEM START MERGES wrong_metadata_compact"
$CLICKHOUSE_CLIENT --query="SYSTEM SYNC REPLICA wrong_metadata_compact"
$CLICKHOUSE_CLIENT --query="SELECT * FROM wrong_metadata_compact order by a FORMAT JSONEachRow"
$CLICKHOUSE_CLIENT --query="SELECT '~~~~~~~'"
$CLICKHOUSE_CLIENT --query="DROP TABLE IF EXISTS wrong_metadata_compact"