Removed obsolete code that was relevant only in extremely rare case of transforming CollapsingMergeTree to ReplicatedCollapsingMergeTree through ATTACHing data parts and while some considerations about data order in table is important [#CLICKHOUSE-3091].

This commit is contained in:
Alexey Milovidov 2017-06-25 03:51:51 +03:00
parent 3c660fd65b
commit 8228acd3ea
11 changed files with 57 additions and 254 deletions

View File

@ -38,8 +38,6 @@ public:
bool contains(const Part & rhs) const
{
return month == rhs.month /// Parts for different months are not merged
&& left_date <= rhs.left_date
&& right_date >= rhs.right_date
&& left <= rhs.left
&& right >= rhs.right
&& level >= rhs.level;

View File

@ -1445,34 +1445,6 @@ size_t MergeTreeData::getMaxPartsCountForMonth() const
}
std::pair<Int64, bool> MergeTreeData::getMinBlockNumberForMonth(DayNum_t month) const
{
std::lock_guard<std::mutex> lock(all_data_parts_mutex);
for (const auto & part : all_data_parts) /// The search can be done better.
if (part->month == month)
return { part->left, true }; /// Blocks in data_parts are sorted by month and left.
return { 0, false };
}
bool MergeTreeData::hasBlockNumberInMonth(Int64 block_number, DayNum_t month) const
{
std::lock_guard<std::mutex> lock(data_parts_mutex);
for (const auto & part : data_parts) /// The search can be done better.
{
if (part->month == month && part->left <= block_number && part->right >= block_number)
return true;
if (part->month > month)
break;
}
return false;
}
void MergeTreeData::delayInsertIfNeeded(Poco::Event * until)
{
const size_t parts_count = getMaxPartsCountForMonth();

View File

@ -296,12 +296,6 @@ public:
size_t getMaxPartsCountForMonth() const;
/// Returns a pair of min block number in the given month and a bool denoting if there is at least one part.
std::pair<Int64, bool> getMinBlockNumberForMonth(DayNum_t month) const;
/// Returns true if block_number is contained in some part of the given month.
bool hasBlockNumberInMonth(Int64 block_number, DayNum_t month) const;
/// If the table contains too many active parts, sleep for a while to give them time to merge.
/// If until is non-null, wake up from the sleep earlier if the event happened.
void delayInsertIfNeeded(Poco::Event * until = nullptr);

View File

@ -139,7 +139,26 @@ void ReplicatedMergeTreeBlockOutputStream::write(const Block & block)
}
void ReplicatedMergeTreeBlockOutputStream::commitPart(zkutil::ZooKeeperPtr & zookeeper, MergeTreeData::MutableDataPartPtr & part, String block_id)
void ReplicatedMergeTreeBlockOutputStream::writeExistingPart(MergeTreeData::MutableDataPartPtr & part)
{
/// NOTE No delay in this case. That's Ok.
auto zookeeper = storage.getZooKeeper();
assertSessionIsNotExpired(zookeeper);
if (quorum)
checkQuorumPrecondition(zookeeper);
Stopwatch watch;
commitPart(zookeeper, part, "");
if (auto part_log = storage.context.getPartLog(part->storage.getDatabaseName(), part->storage.getTableName()))
part_log->addNewPart(*part, watch.elapsed());
}
void ReplicatedMergeTreeBlockOutputStream::commitPart(zkutil::ZooKeeperPtr & zookeeper, MergeTreeData::MutableDataPartPtr & part, const String & block_id)
{
assertSessionIsNotExpired(zookeeper);

View File

@ -27,6 +27,9 @@ public:
void write(const Block & block) override;
/// For ATTACHing existing data on filesystem.
void writeExistingPart(MergeTreeData::MutableDataPartPtr & part);
private:
struct QuorumInfo
{
@ -40,7 +43,7 @@ private:
void checkQuorumPrecondition(zkutil::ZooKeeperPtr & zookeeper);
/// Rename temporary part and commit to ZooKeeper.
void commitPart(zkutil::ZooKeeperPtr & zookeeper, MergeTreeData::MutableDataPartPtr & part, String block_id);
void commitPart(zkutil::ZooKeeperPtr & zookeeper, MergeTreeData::MutableDataPartPtr & part, const String & block_id);
StorageReplicatedMergeTree & storage;
size_t quorum;

View File

@ -38,12 +38,6 @@ void ReplicatedMergeTreeLogEntryData::writeText(WriteBuffer & out) const
out << new_part_name;
break;
case ATTACH_PART:
out << "attach\n"
<< "detached\n"
<< source_part_name << "\ninto\n" << new_part_name;
break;
default:
throw Exception("Unknown log entry type: " + DB::toString<int>(type), ErrorCodes::LOGICAL_ERROR);
}
@ -108,11 +102,13 @@ void ReplicatedMergeTreeLogEntryData::readText(ReadBuffer & in)
}
else if (type_str == "attach")
{
/// Obsolete. TODO: Remove after half year.
type = ATTACH_PART;
String source_type;
in >> source_type;
if (source_type != "detached")
throw Exception("Bad format: expected 'detached', found '" + source_type + "'", ErrorCodes::CANNOT_PARSE_TEXT);
String source_part_name;
in >> "\n" >> source_part_name >> "\ninto\n" >> new_part_name;
}

View File

@ -35,7 +35,7 @@ struct ReplicatedMergeTreeLogEntryData
GET_PART, /// Get the part from another replica.
MERGE_PARTS, /// Merge the parts.
DROP_RANGE, /// Delete the parts in the specified month in the specified number range.
ATTACH_PART, /// Move a part from the `detached` directory.
ATTACH_PART, /// Move a part from the `detached` directory. Obsolete. TODO: Remove after half year.
};
String typeToString() const
@ -72,9 +72,6 @@ struct ReplicatedMergeTreeLogEntryData
/// For DROP_RANGE, true means that the parts need not be deleted, but moved to the `detached` directory.
bool detach = false;
/// For ATTACH_PART, the name of the part in the `detached` directory.
String source_part_name;
/// Access under queue_mutex, see ReplicatedMergeTreeQueue.
bool currently_executing = false; /// Whether the action is executing now.
/// These several fields are informational only (for viewing by the user using system tables).

View File

@ -184,12 +184,6 @@ void ReplicatedMergeTreePartCheckThread::searchForMissingPart(const String & par
return;
}
if (part_info.left <= RESERVED_BLOCK_NUMBERS)
{
LOG_ERROR(log, "Won't add nonincrement_block_numbers because part is one of first in partition");
return;
}
const auto partition_str = part_name.substr(0, 6);
for (auto i = part_info.left; i <= part_info.right; ++i)
{

View File

@ -109,29 +109,6 @@ namespace ErrorCodes
static const auto QUEUE_UPDATE_ERROR_SLEEP_MS = 1 * 1000;
static const auto MERGE_SELECTING_SLEEP_MS = 5 * 1000;
/** Some data numbers are assigned to the data blocks that are integers.
* For blocks added in the usual way (INSERT), numbers are allocated in ascending order.
* Merges are made for the range of block numbers on a numeric line
* if the block numbers `x`, `z` are involved in the merge, and there is a block with the number `y` that `x < y < z`, then the block with the number `y` is also involved in the merge.
* This is required to save the properties of some operations that can be performed during a merge - for example, in CollapsingMergeTree.
* In particular, this allows during the merge to know that in one part all the data was added earlier than all the data in the other part.
*
* Occasionally there is a need to add to the table some obviously old part of data,
* so that it is perceived as old in the logic of CollapsingMergeTree.
* You can add such a part of data with a special ATTACH request.
* And in this case, we need to allocate to this part numbers smaller than the numbers of all the other parts.
* In this regard, the numbers of the regular parts added by INSERT do not start from zero, but from a larger number,
* and smaller numbers are considered "reserved".
*
* Why is this number 200?
* The fact is that previously negative block numbers were not supported.
* And also, the merge is done that way so that when you increase the number of parts, insertion of new parts slows down on purpose,
* until mergers have time to reduce the number of parts; and it was calculated for about 200 parts.
* So, when you insert all the parts from the other table into the table, 200 is sure enough.
* In turn, this number is chosen almost at random.
*/
extern const Int64 RESERVED_BLOCK_NUMBERS = 200;
/** There are three places for each part, where it should be
* 1. In the RAM, MergeTreeData::data_parts, all_data_parts.
@ -813,7 +790,7 @@ void StorageReplicatedMergeTree::checkParts(bool skip_sanity_checks)
*/
size_t unexpected_parts_nonnew = 0;
for (const auto & part : unexpected_parts)
if (part->level > 0 || part->right < RESERVED_BLOCK_NUMBERS)
if (part->level > 0)
++unexpected_parts_nonnew;
String sanity_report = "There are "
@ -1008,6 +985,12 @@ void StorageReplicatedMergeTree::pullLogsToQueue(zkutil::EventPtr next_update_ev
bool StorageReplicatedMergeTree::executeLogEntry(const LogEntry & entry)
{
if (entry.type == LogEntry::ATTACH_PART)
{
LOG_ERROR(log, "Log entries of type ATTACH_PART are obsolete. Skipping.");
return true;
}
if (entry.type == LogEntry::DROP_RANGE)
{
executeDropRange(entry);
@ -1015,8 +998,7 @@ bool StorageReplicatedMergeTree::executeLogEntry(const LogEntry & entry)
}
if (entry.type == LogEntry::GET_PART ||
entry.type == LogEntry::MERGE_PARTS ||
entry.type == LogEntry::ATTACH_PART)
entry.type == LogEntry::MERGE_PARTS)
{
/// If we already have this part or a part covering it, we do not need to do anything.
MergeTreeData::DataPartPtr containing_part = data.getActiveContainingPart(entry.new_part_name);
@ -1046,10 +1028,6 @@ bool StorageReplicatedMergeTree::executeLogEntry(const LogEntry & entry)
{
do_fetch = true;
}
else if (entry.type == LogEntry::ATTACH_PART)
{
do_fetch = !executeAttachPart(entry);
}
else if (entry.type == LogEntry::MERGE_PARTS)
{
std::stringstream log_message;
@ -1234,20 +1212,6 @@ bool StorageReplicatedMergeTree::executeLogEntry(const LogEntry & entry)
++current_table_fetches;
SCOPE_EXIT({--current_table_fetches;});
if (replica.empty() && entry.type == LogEntry::ATTACH_PART)
{
/** If ATTACH - a piece may not be here, because the replica, on which the part is, still did not have time to attach it.
* In that case, you need to wait for this.
*/
/// The part must be on the initiator replica.
if (entry.source_replica.empty() || entry.source_replica == replica_name)
throw Exception("Logical error: no source replica specified for ATTACH_PART log entry;"
" or trying to fetch part on source replica", ErrorCodes::LOGICAL_ERROR);
throw Exception("No active replica has attached part " + entry.new_part_name + " or covering part yet", ErrorCodes::NO_REPLICA_HAS_PART);
}
try
{
if (replica.empty())
@ -1262,9 +1226,6 @@ bool StorageReplicatedMergeTree::executeLogEntry(const LogEntry & entry)
if (entry.type != LogEntry::GET_PART)
throw Exception("Logical error: log entry with quorum but type is not GET_PART", ErrorCodes::LOGICAL_ERROR);
if (entry.block_id.empty())
throw Exception("Logical error: log entry with quorum has empty block_id", ErrorCodes::LOGICAL_ERROR);
LOG_DEBUG(log, "No active replica has part " << entry.new_part_name << " which needs to be written with quorum."
" Will try to mark that quorum as failed.");
@ -1341,7 +1302,7 @@ bool StorageReplicatedMergeTree::executeLogEntry(const LogEntry & entry)
zkutil::CreateMode::Persistent));
/// Deleting from `blocks`.
if (zookeeper->exists(zookeeper_path + "/blocks/" + entry.block_id))
if (!entry.block_id.empty() && zookeeper->exists(zookeeper_path + "/blocks/" + entry.block_id))
ops.emplace_back(std::make_unique<zkutil::Op::Remove>(zookeeper_path + "/blocks/" + entry.block_id, -1));
auto code = zookeeper->tryMulti(ops);
@ -1473,42 +1434,6 @@ void StorageReplicatedMergeTree::executeDropRange(const StorageReplicatedMergeTr
}
bool StorageReplicatedMergeTree::executeAttachPart(const StorageReplicatedMergeTree::LogEntry & entry)
{
String source_path = "detached/" + entry.source_part_name;
LOG_INFO(log, "Attaching part " << entry.source_part_name << " from " << source_path << " as " << entry.new_part_name);
if (!Poco::File(data.getFullPath() + source_path).exists())
{
LOG_INFO(log, "No part at " << source_path << ". Will fetch it instead");
return false;
}
LOG_DEBUG(log, "Checking data");
MergeTreeData::MutableDataPartPtr part = data.loadPartAndFixMetadata(source_path);
zkutil::Ops ops;
checkPartAndAddToZooKeeper(part, ops, entry.new_part_name);
getZooKeeper()->multi(ops);
/// NOTE: We can not use `renameTempPartAndAdd`, because the part is not temporary - if something goes wrong, you do not need to delete it.
part->renameTo(entry.new_part_name);
part->name = entry.new_part_name;
ActiveDataPartSet::parsePartName(part->name, *part);
data.attachPart(part);
LOG_INFO(log, "Finished attaching part " << entry.new_part_name);
/// New parts with other data may appear at the place of the deleted parts.
context.resetCaches();
return true;
}
void StorageReplicatedMergeTree::queueUpdatingThread()
{
setThreadName("ReplMTQueueUpd");
@ -1655,28 +1580,12 @@ namespace
/// You can merge the parts, if all the numbers between them are abandoned - do not correspond to any blocks.
for (Int64 number = left->right + 1; number <= right->left - 1; ++number)
{
/** For numbers before RESERVED_BLOCK_NUMBERS AbandonableLock is not used
* - these numbers can not be "abandoned" - that is, not used for parts.
* These are the part numbers that were added using `ALTER ... ATTACH`.
* They should go without a gap (for each number there should be a part).
* We check that for all such numbers there are parts,
* otherwise, through the "holes" - missing parts, you can not merge.
*/
String path1 = zookeeper_path + "/block_numbers/" + month_name + "/block-" + padIndex(number);
String path2 = zookeeper_path + "/nonincrement_block_numbers/" + month_name + "/block-" + padIndex(number);
if (number < RESERVED_BLOCK_NUMBERS)
{
if (!data.hasBlockNumberInMonth(number, left->month))
return false;
}
else
{
String path1 = zookeeper_path + "/block_numbers/" + month_name + "/block-" + padIndex(number);
String path2 = zookeeper_path + "/nonincrement_block_numbers/" + month_name + "/block-" + padIndex(number);
if (AbandonableLockInZooKeeper::check(path1, *zookeeper) != AbandonableLockInZooKeeper::ABANDONED &&
AbandonableLockInZooKeeper::check(path2, *zookeeper) != AbandonableLockInZooKeeper::ABANDONED)
return false;
}
if (AbandonableLockInZooKeeper::check(path1, *zookeeper) != AbandonableLockInZooKeeper::ABANDONED &&
AbandonableLockInZooKeeper::check(path2, *zookeeper) != AbandonableLockInZooKeeper::ABANDONED)
return false;
}
return true;
@ -1693,8 +1602,6 @@ namespace
* Let's use a statement that if a couple of parts were possible to merge, and their merge is not yet planned,
* then now they can be merged, and we will remember this state,
* not to send multiple identical requests to ZooKeeper.
*
* TODO This works incorrectly with DROP PARTITION and then ATTACH PARTITION.
*/
/** Cache for function, that returns bool.
@ -1910,7 +1817,7 @@ bool StorageReplicatedMergeTree::createLogEntryToMergeParts(
for (size_t i = 0; i + 1 < parts.size(); ++i)
{
/// Remove the unnecessary entries about non-existent blocks.
for (Int64 number = std::max(RESERVED_BLOCK_NUMBERS, parts[i]->right + 1); number <= parts[i + 1]->left - 1; ++number)
for (Int64 number = parts[i]->right + 1; number <= parts[i + 1]->left - 1; ++number)
{
zookeeper->tryRemove(zookeeper_path + "/block_numbers/" + month_name + "/block-" + padIndex(number));
zookeeper->tryRemove(zookeeper_path + "/nonincrement_block_numbers/" + month_name + "/block-" + padIndex(number));
@ -2634,7 +2541,7 @@ void StorageReplicatedMergeTree::alter(const AlterCommands & params,
/// The name of an imaginary part covering all possible parts in the specified month with numbers in the specified range.
static String getFakePartNameForDrop(const String & month_name, UInt64 left, UInt64 right)
static String getFakePartNameForDrop(const String & month_name, UInt64 right)
{
/// The date range is all month long.
const auto & lut = DateLUT::instance();
@ -2642,9 +2549,8 @@ static String getFakePartNameForDrop(const String & month_name, UInt64 left, UIn
DayNum_t left_date = lut.toDayNum(start_time);
DayNum_t right_date = DayNum_t(static_cast<size_t>(left_date) + lut.daysInMonth(start_time) - 1);
/// Level - `right-left+1`: part could not have been formed as a result of such or more mergers.
/// TODO This is not true for parts after ATTACH.
return ActiveDataPartSet::getPartName(left_date, right_date, left, right, right - left + 1);
/// Artificial high level is choosen, to make this part "covering" all parts inside.
return ActiveDataPartSet::getPartName(left_date, right_date, 0, right, 999999999);
}
@ -2692,12 +2598,6 @@ void StorageReplicatedMergeTree::dropPartition(
return;
}
auto number_and_exists = data.getMinBlockNumberForMonth(data.getMonthFromName(month_name));
/// Even if there is no data in the partition, you still need to mark the range for deletion.
/// - Because before executing DETACH, tasks for downloading parts to this partition can be executed.
Int64 left = number_and_exists.second ? number_and_exists.first : RESERVED_BLOCK_NUMBERS;
/** Let's skip one number in `block_numbers` for the month being deleted, and we will only delete parts until this number.
* This prohibits merges of deleted parts with the new inserted data.
* Invariant: merges of deleted parts with other parts do not appear in the log.
@ -2718,7 +2618,7 @@ void StorageReplicatedMergeTree::dropPartition(
throw Exception("Logical error: newly allocated block number is zero", ErrorCodes::LOGICAL_ERROR);
--right;
String fake_part_name = getFakePartNameForDrop(month_name, left, right);
String fake_part_name = getFakePartNameForDrop(month_name, right);
/** Forbid to choose the parts to be deleted for merging.
* Invariant: after the `DROP_RANGE` entry appears in the log, merge of deleted parts will not appear in the log.
@ -2728,7 +2628,7 @@ void StorageReplicatedMergeTree::dropPartition(
queue.disableMergesInRange(fake_part_name);
}
LOG_DEBUG(log, "Disabled merges in range " << left << " - " << right << " for month " << month_name);
LOG_DEBUG(log, "Disabled merges in range " << "0_" << right << " for partition " << month_name);
/// Finally, having achieved the necessary invariants, you can put an entry in the log.
LogEntry entry;
@ -2799,72 +2699,25 @@ void StorageReplicatedMergeTree::attachPartition(const ASTPtr & query, const Fie
}
}
/// Synchronously check that added parts exist and are not broken at least on this replica. We will write checksums.txt if it does not exist.
/// Synchronously check that added parts exist and are not broken. We will write checksums.txt if it does not exist.
LOG_DEBUG(log, "Checking parts");
std::vector<MergeTreeData::MutableDataPartPtr> loaded_parts;
for (const String & part : parts)
{
LOG_DEBUG(log, "Checking part " << part);
data.loadPartAndFixMetadata(source_dir + part);
loaded_parts.push_back(data.loadPartAndFixMetadata(source_dir + part));
}
/// Select maximum free numbers to blocks to be added, less than RESERVED_BLOCK_NUMBERS.
/// NOTE: The free numbers check does not synchronize. You can not perform multiple ATTACH/DETACH/DROP requests at the same time.
Int64 min_used_number = RESERVED_BLOCK_NUMBERS;
DayNum_t month = MergeTreeData::getMonthFromPartPrefix(partition);
auto num_and_exists = data.getMinBlockNumberForMonth(month);
if (num_and_exists.second && num_and_exists.first < min_used_number)
min_used_number = num_and_exists.first;
/// Add entries to the log.
std::reverse(parts.begin(), parts.end());
std::list<LogEntry> entries;
zkutil::Ops ops;
for (const String & part_name : parts)
ReplicatedMergeTreeBlockOutputStream output(*this, 0, 0); /// TODO Allow to use quorum here.
for (auto & part : loaded_parts)
{
ActiveDataPartSet::Part part;
ActiveDataPartSet::parsePartName(part_name, part);
part.left = part.right = --min_used_number;
part.level = 0; /// previous level has no sense after attach.
String new_part_name = ActiveDataPartSet::getPartName(part.left_date, part.right_date, part.left, part.right, part.level);
LOG_INFO(log, "Will attach " << part_name << " as " << new_part_name);
entries.emplace_back();
LogEntry & entry = entries.back();
entry.type = LogEntry::ATTACH_PART;
entry.source_replica = replica_name;
entry.source_part_name = part_name;
entry.new_part_name = new_part_name;
entry.create_time = time(0);
ops.emplace_back(std::make_unique<zkutil::Op::Create>(
zookeeper_path + "/log/log-", entry.toString(), getZooKeeper()->getDefaultACL(), zkutil::CreateMode::PersistentSequential));
}
LOG_DEBUG(log, "Adding attaches to log");
getZooKeeper()->multi(ops);
/// If necessary, wait until the operation is performed on itself or on all replicas.
if (settings.replication_alter_partitions_sync != 0)
{
size_t i = 0;
for (LogEntry & entry : entries)
{
String log_znode_path = dynamic_cast<zkutil::Op::Create &>(*ops[i]).getPathCreated();
entry.znode_name = log_znode_path.substr(log_znode_path.find_last_of('/') + 1);
if (settings.replication_alter_partitions_sync == 1)
waitForReplicaToProcessLogEntry(replica_name, entry);
else
waitForAllReplicasToProcessLogEntry(entry);
++i;
}
String old_name = part->name;
output.writeExistingPart(part);
LOG_DEBUG(log, "Attached part " << old_name << " as " << part->name);
}
}
bool StorageReplicatedMergeTree::checkTableCanBeDropped() const
{
/// Consider only synchronized data
@ -2873,6 +2726,7 @@ bool StorageReplicatedMergeTree::checkTableCanBeDropped() const
return true;
}
void StorageReplicatedMergeTree::drop()
{
{
@ -2941,23 +2795,6 @@ bool StorageReplicatedMergeTree::existsNodeCached(const std::string & path)
AbandonableLockInZooKeeper StorageReplicatedMergeTree::allocateBlockNumber(const String & month_name, zkutil::ZooKeeperPtr & zookeeper)
{
String month_path = zookeeper_path + "/block_numbers/" + month_name;
if (!existsNodeCached(month_path))
{
/// Create a node in `block_numbers` for the month and skip over it N = RESERVED_BLOCK_NUMBERS of increment values.
/// It is necessary that in the future, if needed, you can add data to the beginning.
zkutil::Ops ops;
auto acl = zookeeper->getDefaultACL();
ops.emplace_back(std::make_unique<zkutil::Op::Create>(month_path, "", acl, zkutil::CreateMode::Persistent));
for (size_t i = 0; i < RESERVED_BLOCK_NUMBERS; ++i)
{
ops.emplace_back(std::make_unique<zkutil::Op::Create>(month_path + "/skip_increment", "", acl, zkutil::CreateMode::Persistent));
ops.emplace_back(std::make_unique<zkutil::Op::Remove>(month_path + "/skip_increment", -1));
}
/// Ignore the errors - it could not work if someone else executed this line before us.
zookeeper->tryMulti(ops);
}
return AbandonableLockInZooKeeper(
zookeeper_path + "/block_numbers/" + month_name + "/block-",
zookeeper_path + "/temp", *zookeeper);

View File

@ -48,7 +48,6 @@ namespace DB
* Each entry is one of:
* - normal data insertion (GET),
* - merge (MERGE),
* - slightly less common data insertion (ATTACH),
* - delete the partition (DROP).
*
* Each replica copies (queueUpdatingThread, pullLogsToQueue) entries from the log to its queue (/replicas/replica_name/queue/queue-...)
@ -400,7 +399,6 @@ private:
bool executeLogEntry(const LogEntry & entry);
void executeDropRange(const LogEntry & entry);
bool executeAttachPart(const LogEntry & entry); /// Returns false if the part is absent, and it needs to be picked up from another replica.
/** Updates the queue.
*/
@ -510,7 +508,6 @@ private:
};
extern const Int64 RESERVED_BLOCK_NUMBERS;
extern const int MAX_AGE_OF_LOCAL_PART_THAT_WASNT_ADDED_TO_ZOOKEEPER;
}

View File

@ -33,7 +33,6 @@ StorageSystemReplicationQueue::StorageSystemReplicationQueue(const std::string &
{ "new_part_name", std::make_shared<DataTypeString>() },
{ "parts_to_merge", std::make_shared<DataTypeArray>(std::make_shared<DataTypeString>()) },
{ "is_detach", std::make_shared<DataTypeUInt8>() },
{ "attach_source_part_name", std::make_shared<DataTypeString>() },
/// Processing status of item.
{ "is_currently_executing", std::make_shared<DataTypeUInt8>() },
{ "num_tries", std::make_shared<DataTypeUInt32>() },
@ -102,7 +101,6 @@ BlockInputStreams StorageSystemReplicationQueue::read(
ColumnWithTypeAndName col_parts_to_merge { std::make_shared<ColumnArray>(std::make_shared<ColumnString>()),
std::make_shared<DataTypeArray>(std::make_shared<DataTypeString>()), "parts_to_merge" };
ColumnWithTypeAndName col_is_detach { std::make_shared<ColumnUInt8>(), std::make_shared<DataTypeUInt8>(), "is_detach" };
ColumnWithTypeAndName col_attach_source_part_name{ std::make_shared<ColumnString>(), std::make_shared<DataTypeString>(), "attach_source_part_name" };
ColumnWithTypeAndName col_is_currently_executing { std::make_shared<ColumnUInt8>(), std::make_shared<DataTypeUInt8>(), "is_currently_executing" };
ColumnWithTypeAndName col_num_tries { std::make_shared<ColumnUInt32>(), std::make_shared<DataTypeUInt32>(), "num_tries" };
ColumnWithTypeAndName col_last_exception { std::make_shared<ColumnString>(), std::make_shared<DataTypeString>(), "last_exception" };
@ -142,7 +140,6 @@ BlockInputStreams StorageSystemReplicationQueue::read(
col_new_part_name .column->insert(entry.new_part_name);
col_parts_to_merge .column->insert(parts_to_merge);
col_is_detach .column->insert(UInt64(entry.detach));
col_attach_source_part_name .column->insert(entry.source_part_name);
col_is_currently_executing .column->insert(UInt64(entry.currently_executing));
col_num_tries .column->insert(UInt64(entry.num_tries));
col_last_exception .column->insert(entry.exception ? getExceptionMessage(entry.exception, false) : "");
@ -166,7 +163,6 @@ BlockInputStreams StorageSystemReplicationQueue::read(
col_new_part_name,
col_parts_to_merge,
col_is_detach,
col_attach_source_part_name,
col_is_currently_executing,
col_num_tries,
col_last_exception,