choose part type while selecting parts to merge

This commit is contained in:
CurtizJ 2020-02-11 16:41:26 +03:00
parent c72c38aea9
commit b26a8b5622
13 changed files with 111 additions and 41 deletions

View File

@ -340,7 +340,7 @@ void IMergeTreeDataPart::assertOnDisk() const
{
if (!isStoredOnDisk())
throw Exception("Data part '" + name + "' with type '"
+ typeToString(getType()) + "' is not stored on disk", ErrorCodes::LOGICAL_ERROR);
+ getType().toString() + "' is not stored on disk", ErrorCodes::LOGICAL_ERROR);
}
@ -416,7 +416,7 @@ void IMergeTreeDataPart::loadColumnsChecksumsIndexes(bool require_columns_checks
void IMergeTreeDataPart::loadIndexGranularity()
{
throw Exception("Method 'loadIndexGranularity' is not implemented for part with type " + typeToString(getType()), ErrorCodes::NOT_IMPLEMENTED);
throw Exception("Method 'loadIndexGranularity' is not implemented for part with type " + getType().toString(), ErrorCodes::NOT_IMPLEMENTED);
}
void IMergeTreeDataPart::loadIndex()
@ -779,23 +779,6 @@ void IMergeTreeDataPart::remove() const
}
}
String IMergeTreeDataPart::typeToString(Type type)
{
switch (type)
{
case Type::WIDE:
return "Wide";
case Type::COMPACT:
return "Compact";
case Type::IN_MEMORY:
return "InMemory";
case Type::UNKNOWN:
return "Unknown";
}
__builtin_unreachable();
}
String IMergeTreeDataPart::getRelativePathForDetachedPart(const String & prefix) const
{
/// Do not allow underscores in the prefix because they are used as separators.

View File

@ -113,8 +113,7 @@ public:
Type getType() const { return part_type; }
static String typeToString(Type type);
String getTypeName() const { return typeToString(getType()); }
String getTypeName() const { return getType().toString(); }
void setColumns(const NamesAndTypesList & new_columns);

View File

@ -1704,7 +1704,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeData::createPart(
const String & name, const MergeTreePartInfo & part_info,
const DiskPtr & disk, const String & relative_path) const
{
auto type = MergeTreeDataPartType::UNKNOWN;
MergeTreeDataPartType type;
auto full_path = getFullPathOnDisk(disk) + relative_path + "/";
auto mrk_ext = MergeTreeIndexGranularityInfo::getMrkExtensionFromFS(full_path);

View File

@ -89,19 +89,25 @@ void FutureMergedMutatedPart::assign(MergeTreeData::DataPartsVector parts_)
UInt32 max_level = 0;
Int64 max_mutation = 0;
size_t sum_rows = 0;
size_t sum_bytes_uncompressed = 0;
for (const auto & part : parts)
{
max_level = std::max(max_level, part->info.level);
max_mutation = std::max(max_mutation, part->info.mutation);
sum_rows += part->rows_count;
sum_bytes_uncompressed += part->getTotalColumnsSize().data_uncompressed;
}
const auto & storage = parts.front()->storage;
type = storage.choosePartType(sum_bytes_uncompressed, sum_rows);
part_info.partition_id = parts.front()->info.partition_id;
part_info.min_block = parts.front()->info.min_block;
part_info.max_block = parts.back()->info.max_block;
part_info.level = max_level + 1;
part_info.mutation = max_mutation;
if (parts.front()->storage.format_version < MERGE_TREE_DATA_MIN_FORMAT_VERSION_WITH_CUSTOM_PARTITIONING)
if (storage.format_version < MERGE_TREE_DATA_MIN_FORMAT_VERSION_WITH_CUSTOM_PARTITIONING)
{
DayNum min_date = DayNum(std::numeric_limits<UInt16>::max());
DayNum max_date = DayNum(std::numeric_limits<UInt16>::min());
@ -576,20 +582,14 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mergePartsToTempor
all_columns, data.sorting_key_expr, data.skip_indices,
data.merging_params, gathering_columns, gathering_column_names, merging_columns, merging_column_names);
size_t sum_input_rows_upper_bound = merge_entry->total_rows_count;
size_t estimated_bytes_uncompressed = 0;
for (const auto & part : parts)
estimated_bytes_uncompressed += part->getTotalColumnsSize().data_uncompressed;
MergeTreeData::MutableDataPartPtr new_data_part = data.createPart(
future_part.name,
future_part.type,
future_part.part_info,
space_reservation->getDisk(),
all_columns,
estimated_bytes_uncompressed,
sum_input_rows_upper_bound,
TMP_PREFIX + future_part.name);
new_data_part->setColumns(all_columns);
new_data_part->partition.assign(future_part.getPartition());
new_data_part->is_temp = true;
@ -607,6 +607,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mergePartsToTempor
need_remove_expired_values = false;
}
size_t sum_input_rows_upper_bound = merge_entry->total_rows_count;
MergeAlgorithm merge_alg = chooseMergeAlgorithm(parts, sum_input_rows_upper_bound, gathering_columns, deduplicate, need_remove_expired_values);
LOG_DEBUG(log, "Selected MergeAlgorithm: " << ((merge_alg == MergeAlgorithm::Vertical) ? "Vertical" : "Horizontal"));
@ -990,7 +991,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mutatePartToTempor
auto new_data_part = data.createPart(
future_part.name,
source_part->getType(),
future_part.type,
future_part.part_info,
space_reservation->getDisk(),
"tmp_mut_" + future_part.name);

View File

@ -18,6 +18,7 @@ struct FutureMergedMutatedPart
{
String name;
String path;
MergeTreeDataPartType type;
MergeTreePartInfo part_info;
MergeTreeData::DataPartsVector parts;

View File

@ -78,7 +78,7 @@ ColumnSize MergeTreeDataPartCompact::getTotalColumnsSize() const
if (bin_checksum != checksums.files.end())
{
total_size.data_compressed += bin_checksum->second.file_size;
total_size.data_compressed += bin_checksum->second.uncompressed_size;
total_size.data_uncompressed += bin_checksum->second.uncompressed_size;
}
auto mrk_checksum = checksums.files.find(DATA_FILE_NAME + index_granularity_info.marks_file_extension);

View File

@ -0,0 +1,43 @@
#include <Storages/MergeTree/MergeTreeDataPartType.h>
#include <Common/Exception.h>
namespace DB
{
namespace ErrorCodes
{
extern const int UNKNOWN_PART_TYPE;
}
void MergeTreeDataPartType::fromString(const String & str)
{
if (str == "Wide")
value = WIDE;
else if (str == "Compact")
value = COMPACT;
else if (str == "InMemory")
value = IN_MEMORY;
else if (str == "Unknown")
value = UNKNOWN;
else
throw DB::Exception("Unexpected string for part type: " + str, ErrorCodes::UNKNOWN_PART_TYPE);
}
String MergeTreeDataPartType::toString() const
{
switch (value)
{
case WIDE:
return "Wide";
case COMPACT:
return "Compact";
case IN_MEMORY:
return "InMemory";
case UNKNOWN:
return "Unknown";
}
__builtin_unreachable();
}
}

View File

@ -1,11 +1,17 @@
#pragma once
#include <Core/Types.h>
namespace DB
{
/// Types of data part format.
enum class MergeTreeDataPartType
/// Types of data part format.
class MergeTreeDataPartType
{
public:
enum Value
{
/// Data of each is stored in one or several (for complex types) files.
/// Data of each column is stored in one or several (for complex types) files.
/// Every data file is followed by marks file.
WIDE,
@ -17,4 +23,25 @@ namespace DB
UNKNOWN,
};
MergeTreeDataPartType() : value(UNKNOWN) {}
MergeTreeDataPartType(Value value_) : value(value_) {}
bool operator==(const MergeTreeDataPartType & other) const
{
return value == other.value;
}
bool operator!=(const MergeTreeDataPartType & other) const
{
return !(*this == other);
}
void fromString(const String & str);
String toString() const;
private:
Value value;
};
}

View File

@ -12,7 +12,7 @@ namespace DB
void ReplicatedMergeTreeLogEntryData::writeText(WriteBuffer & out) const
{
out << "format version: 4\n"
out << "format version: 5\n"
<< "create_time: " << LocalDateTime(create_time ? create_time : time(nullptr)) << "\n"
<< "source replica: " << source_replica << '\n'
<< "block_id: " << escape << block_id << '\n';
@ -29,6 +29,7 @@ void ReplicatedMergeTreeLogEntryData::writeText(WriteBuffer & out) const
out << s << '\n';
out << "into\n" << new_part_name;
out << "\ndeduplicate: " << deduplicate;
out << "\npart_type: " << new_part_type.toString();
break;
case DROP_RANGE:
@ -82,7 +83,7 @@ void ReplicatedMergeTreeLogEntryData::readText(ReadBuffer & in)
in >> "format version: " >> format_version >> "\n";
if (format_version < 1 || format_version > 4)
if (format_version < 1 || format_version > 5)
throw Exception("Unknown ReplicatedMergeTreeLogEntry format version: " + DB::toString(format_version), ErrorCodes::UNKNOWN_FORMAT_VERSION);
if (format_version >= 2)
@ -120,6 +121,12 @@ void ReplicatedMergeTreeLogEntryData::readText(ReadBuffer & in)
in >> new_part_name;
if (format_version >= 4)
in >> "\ndeduplicate: " >> deduplicate;
if (format_version >= 5)
{
String part_type_str;
in >> "\npart_type: " >> type_str;
new_part_type.fromString(type_str);
}
}
else if (type_str == "drop" || type_str == "detach")
{

View File

@ -4,6 +4,7 @@
#include <Common/ZooKeeper/Types.h>
#include <Core/Types.h>
#include <IO/WriteHelpers.h>
#include <Storages/MergeTree/MergeTreeDataPartType.h>
#include <mutex>
#include <condition_variable>
@ -72,6 +73,7 @@ struct ReplicatedMergeTreeLogEntryData
/// The name of resulting part for GET_PART and MERGE_PARTS
/// Part range for DROP_RANGE and CLEAR_COLUMN
String new_part_name;
MergeTreeDataPartType new_part_type;
String block_id; /// For parts of level zero, the block identifier for deduplication (node name in /blocks/).
mutable String actual_new_part_name; /// GET_PART could actually fetch a part covering 'new_part_name'.

View File

@ -745,6 +745,7 @@ bool StorageMergeTree::tryMutatePart()
future_part.parts.push_back(part);
future_part.part_info = new_part_info;
future_part.name = part->getNewName(new_part_info);
future_part.type = part->getType();
tagger.emplace(future_part, MergeTreeDataMergerMutator::estimateNeededDiskSpace({part}), *this, true);
break;

View File

@ -1220,6 +1220,7 @@ bool StorageReplicatedMergeTree::tryExecutePartMutation(const StorageReplicatedM
future_mutated_part.part_info = new_part_info;
future_mutated_part.name = entry.new_part_name;
future_mutated_part.updatePath(*this, reserved_space);
future_mutated_part.type = source_part->getType();
auto table_id = getStorageID();
MergeList::EntryPtr merge_entry = global_context.getMergeList().insert(
@ -2279,7 +2280,7 @@ void StorageReplicatedMergeTree::mergeSelectingTask()
merger_mutator.selectPartsToMerge(future_merged_part, false, max_source_parts_size_for_merge, merge_pred))
{
success = createLogEntryToMergeParts(zookeeper, future_merged_part.parts,
future_merged_part.name, deduplicate, force_ttl);
future_merged_part.name, future_merged_part.type, deduplicate, force_ttl);
}
/// If there are many mutations in queue it may happen, that we cannot enqueue enough merges to merge all new parts
else if (max_source_part_size_for_mutation > 0 && queue.countMutations() > 0
@ -2344,6 +2345,7 @@ bool StorageReplicatedMergeTree::createLogEntryToMergeParts(
zkutil::ZooKeeperPtr & zookeeper,
const DataPartsVector & parts,
const String & merged_name,
const MergeTreeDataPartType & merged_part_type,
bool deduplicate,
bool force_ttl,
ReplicatedMergeTreeLogEntryData * out_log_entry)
@ -2380,12 +2382,15 @@ bool StorageReplicatedMergeTree::createLogEntryToMergeParts(
entry.type = LogEntry::MERGE_PARTS;
entry.source_replica = replica_name;
entry.new_part_name = merged_name;
entry.new_part_type = merged_part_type;
entry.deduplicate = deduplicate;
entry.force_ttl = force_ttl;
entry.create_time = time(nullptr);
for (const auto & part : parts)
{
entry.source_parts.push_back(part->name);
}
String path_created = zookeeper->create(zookeeper_path + "/log/log-", entry.toString(), zkutil::CreateMode::PersistentSequential);
entry.znode_name = path_created.substr(path_created.find_last_of('/') + 1);
@ -3152,7 +3157,7 @@ bool StorageReplicatedMergeTree::optimize(const ASTPtr & query, const ASTPtr & p
future_merged_part, disk_space, can_merge, partition_id, true, nullptr);
ReplicatedMergeTreeLogEntryData merge_entry;
if (selected && !createLogEntryToMergeParts(zookeeper, future_merged_part.parts,
future_merged_part.name, deduplicate, force_ttl, &merge_entry))
future_merged_part.name, future_merged_part.type, deduplicate, force_ttl, &merge_entry))
return handle_noop("Can't create merge queue node in ZooKeeper");
if (merge_entry.type != ReplicatedMergeTreeLogEntryData::Type::EMPTY)
merge_entries.push_back(std::move(merge_entry));
@ -3189,7 +3194,7 @@ bool StorageReplicatedMergeTree::optimize(const ASTPtr & query, const ASTPtr & p
ReplicatedMergeTreeLogEntryData merge_entry;
if (!createLogEntryToMergeParts(zookeeper, future_merged_part.parts,
future_merged_part.name, deduplicate, force_ttl, &merge_entry))
future_merged_part.name, future_merged_part.type, deduplicate, force_ttl, &merge_entry))
return handle_noop("Can't create merge queue node in ZooKeeper");
if (merge_entry.type != ReplicatedMergeTreeLogEntryData::Type::EMPTY)
merge_entries.push_back(std::move(merge_entry));

View File

@ -438,6 +438,7 @@ private:
zkutil::ZooKeeperPtr & zookeeper,
const DataPartsVector & parts,
const String & merged_name,
const MergeTreeDataPartType & merged_part_type,
bool deduplicate,
bool force_ttl,
ReplicatedMergeTreeLogEntryData * out_log_entry = nullptr);