polymorphic parts (development)

This commit is contained in:
CurtizJ 2019-11-21 19:10:22 +03:00
parent 435060146b
commit 94abf3691c
21 changed files with 374 additions and 230 deletions

View File

@ -1,6 +1,5 @@
#include <Storages/MergeTree/DataPartsExchange.h>
#include <Storages/IStorage.h>
#include <Storages/MergeTree/MergeTreeDataPartFactory.h>
#include <Common/CurrentMetrics.h>
#include <Common/NetException.h>
#include <Common/typeid_cast.h>
@ -269,10 +268,9 @@ MergeTreeData::MutableDataPartPtr Fetcher::downloadPart(
part_file.createDirectory();
MergeTreeData::MutableDataPartPtr new_data_part = createPart(data, reservation->getDisk(), part_name, relative_part_path);
MergeTreeData::MutableDataPartPtr new_data_part = data.createPart(part_name, reservation->getDisk(),relative_part_path);
new_data_part->is_temp = true;
MergeTreeData::DataPart::Checksums checksums;
for (size_t i = 0; i < files; ++i)
{

View File

@ -138,13 +138,11 @@ void IMergeTreeDataPart::MinMaxIndex::merge(const MinMaxIndex & other)
IMergeTreeDataPart::IMergeTreeDataPart(
MergeTreeData & storage_,
const String & name_,
const MergeTreeIndexGranularityInfo & index_granularity_info_,
const DiskSpace::DiskPtr & disk_,
const std::optional<String> & relative_path_)
: storage(storage_)
, name(name_)
, info(MergeTreePartInfo::fromPartName(name_, storage.format_version))
, index_granularity_info(index_granularity_info_)
, disk(disk_)
, relative_path(relative_path_.value_or(name_))
{
@ -154,13 +152,11 @@ IMergeTreeDataPart::IMergeTreeDataPart(
const MergeTreeData & storage_,
const String & name_,
const MergeTreePartInfo & info_,
const MergeTreeIndexGranularityInfo & index_granularity_info_,
const DiskSpace::DiskPtr & disk_,
const std::optional<String> & relative_path_)
: storage(storage_)
, name(name_)
, info(info_)
, index_granularity_info(index_granularity_info_)
, disk(disk_)
, relative_path(relative_path_.value_or(name_))
{
@ -246,6 +242,7 @@ void IMergeTreeDataPart::setColumns(const NamesAndTypesList & columns_)
columns = columns_;
for (const auto & column : columns)
sample_block.insert({column.type, column.name});
index_granularity_info.initialize(storage, getType(), columns.size());
}
IMergeTreeDataPart::~IMergeTreeDataPart() = default;
@ -776,6 +773,8 @@ String IMergeTreeDataPart::typeToString(Type type)
return "Striped";
case Type::IN_MEMORY:
return "InMemory";
case Type::UNKNOWN:
return "Unknown";
}
__builtin_unreachable();

View File

@ -85,7 +85,7 @@ public:
/// Returns the name of a column with minimum compressed size (as returned by getColumnSize()).
/// If no checksums are present returns the name of the first physically existing column.
virtual String getColumnNameWithMinumumCompressedSize() const = 0;
virtual String getColumnNameWithMinumumCompressedSize() const { return columns.front().name; }
// virtual Checksums check(
// bool require_checksums,
@ -100,12 +100,7 @@ public:
virtual void accumulateColumnSizes(ColumnToSize & column_to_size) const;
enum class Type
{
WIDE,
COMPACT,
IN_MEMORY,
};
using Type = MergeTreeDataPartType;
virtual Type getType() const = 0;
@ -121,14 +116,12 @@ public:
const MergeTreeData & storage_,
const String & name_,
const MergeTreePartInfo & info_,
const MergeTreeIndexGranularityInfo & index_granularity_info_,
const DiskSpace::DiskPtr & disk = {},
const std::optional<String> & relative_path = {});
IMergeTreeDataPart(
MergeTreeData & storage_,
const String & name_,
const MergeTreeIndexGranularityInfo & index_granularity_info_,
const DiskSpace::DiskPtr & disk = {},
const std::optional<String> & relative_path = {});

View File

@ -3,12 +3,20 @@
namespace DB
{
class IMergeTreeDataPart;
class IMergeTreeReader;
class IMergeTreeWriter;
// class IMergeTreeDataPart;
// class IMergeTreeReader;
// class IMergeTreeWriter;
using MergeTreeMutableDataPartPtr = std::shared_ptr<IMergeTreeDataPart>;
using MergeTreeDataPartPtr = std::shared_ptr<const IMergeTreeDataPart>;
using MergeTreeReaderPtr = std::unique_ptr<IMergeTreeReader>;
using MergeTreeWriterPtr = std::unique_ptr<IMergeTreeWriter>;
// using MergeTreeMutableDataPartPtr = std::shared_ptr<IMergeTreeDataPart>;
// using MergeTreeDataPartPtr = std::shared_ptr<const IMergeTreeDataPart>;
// using MergeTreeReaderPtr = std::unique_ptr<IMergeTreeReader>;
// using MergeTreeWriterPtr = std::unique_ptr<IMergeTreeWriter>;
enum class MergeTreeDataPartType
{
WIDE,
COMPACT,
IN_MEMORY,
UNKNOWN,
};
}

View File

@ -8,6 +8,8 @@
namespace DB
{
/// FIXME: implement for compact parts
NameSet injectRequiredColumns(const MergeTreeData & storage, const MergeTreeData::DataPartPtr & part, Names & columns)
{
NameSet required_columns{std::begin(columns), std::end(columns)};

View File

@ -4,8 +4,9 @@
#include <Storages/MergeTree/MergeTreeSequentialBlockInputStream.h>
#include <Storages/MergeTree/MergedBlockOutputStream.h>
#include <Storages/MergeTree/MergedColumnOnlyOutputStream.h>
#include <Storages/MergeTree/MergeTreeDataPartCompact.h>
#include <Storages/MergeTree/MergeTreeDataPartWide.h>
#include <Storages/MergeTree/checkDataPart.h>
#include <Storages/MergeTree/MergeTreeDataPartFactory.h>
#include <Storages/StorageMergeTree.h>
#include <Storages/StorageReplicatedMergeTree.h>
#include <Storages/AlterCommands.h>
@ -817,8 +818,8 @@ void MergeTreeData::loadDataParts(bool skip_sanity_checks)
MergeTreePartInfo part_info;
if (!MergeTreePartInfo::tryParsePartName(part_name, &part_info, format_version))
return;
MutableDataPartPtr part = createPart(*this, part_disk_ptr, part_name, part_info, part_name);
auto part = createPart(part_name, part_info, part_disk_ptr, part_name);
bool broken = false;
try
@ -1411,6 +1412,8 @@ void MergeTreeData::checkAlter(const AlterCommands & commands, const Context & c
getIndices().indices, new_indices.indices, unused_expression, unused_map, unused_bool);
}
/// FIXME implement alter for compact parts
void MergeTreeData::createConvertExpression(const DataPartPtr & part, const NamesAndTypesList & old_columns, const NamesAndTypesList & new_columns,
const IndicesASTs & old_indices, const IndicesASTs & new_indices, ExpressionActionsPtr & out_expression,
NameToNameMap & out_rename_map, bool & out_force_update_metadata) const
@ -1423,7 +1426,7 @@ void MergeTreeData::createConvertExpression(const DataPartPtr & part, const Name
if (part)
part_mrk_file_extension = part->index_granularity_info.marks_file_extension;
else
part_mrk_file_extension = settings->index_granularity_bytes == 0 ? getNonAdaptiveMrkExtension() : getAdaptiveMrkExtension();
part_mrk_file_extension = settings->index_granularity_bytes == 0 ? getNonAdaptiveMrkExtension() : getAdaptiveMrkExtension(MergeTreeDataPartType::WIDE); /// FIXME support compact parts
using NameToType = std::map<String, const IDataType *>;
NameToType new_types;
@ -1575,6 +1578,99 @@ void MergeTreeData::createConvertExpression(const DataPartPtr & part, const Name
}
}
/// FIXME implement createPart without columns and with loadMetadata
MergeTreeDataPartType MergeTreeData::choosePartType(size_t bytes_on_disk, size_t rows_count) const
{
const auto settings = getSettings();
if (bytes_on_disk < settings->min_bytes_for_wide_part || rows_count < settings->min_rows_for_wide_part)
return MergeTreeDataPartType::COMPACT;
return MergeTreeDataPartType::WIDE;
}
MergeTreeData::MutableDataPartPtr MergeTreeData::createPart(const String & name, MergeTreeDataPartType type,
const DiskSpace::DiskPtr & disk, const String & relative_path) const
{
return createPart(name, type, MergeTreePartInfo::fromPartName(name, format_version), disk, relative_path);
}
MergeTreeData::MutableDataPartPtr MergeTreeData::createPart(const String & name, MergeTreeDataPartType type,
const MergeTreePartInfo & part_info,
const DiskSpace::DiskPtr & disk, const String & relative_path) const
{
if (type == MergeTreeDataPartType::COMPACT)
return std::make_shared<MergeTreeDataPartCompact>(*this, name, part_info, disk, relative_path);
else if (type == MergeTreeDataPartType::WIDE)
return std::make_shared<MergeTreeDataPartWide>(*this, name, part_info, disk, relative_path);
else
throw Exception("Unknown part type", ErrorCodes::LOGICAL_ERROR);
}
MergeTreeData::MutableDataPartPtr MergeTreeData::createPart(const String & name,
const DiskSpace::DiskPtr & disk, const NamesAndTypesList & columns,
size_t bytes_on_disk, size_t rows_num, const String & relative_path) const
{
return createPart(name, MergeTreePartInfo::fromPartName(name, format_version),
disk, columns, bytes_on_disk, rows_num, relative_path);
}
MergeTreeData::MutableDataPartPtr MergeTreeData::createPart(const String & name, const MergeTreePartInfo & part_info,
const DiskSpace::DiskPtr & disk, const NamesAndTypesList & columns,
size_t bytes_on_disk, size_t rows_count, const String & relative_path) const
{
auto part = createPart(name, choosePartType(bytes_on_disk, rows_count), part_info, disk, relative_path);
part->setColumns(columns);
part->bytes_on_disk = bytes_on_disk;
part->rows_count = rows_count;
return part;
}
static MergeTreeDataPartType getPartTypeFromMarkExtension(const String & mrk_ext)
{
if (mrk_ext == getNonAdaptiveMrkExtension())
return MergeTreeDataPartType::WIDE;
if (mrk_ext == getAdaptiveMrkExtension(MergeTreeDataPartType::WIDE))
return MergeTreeDataPartType::WIDE;
if (mrk_ext == getAdaptiveMrkExtension(MergeTreeDataPartType::COMPACT))
return MergeTreeDataPartType::COMPACT;
return MergeTreeDataPartType::UNKNOWN;
}
MergeTreeData::MutableDataPartPtr MergeTreeData::createPart(const String & name,
const DiskSpace::DiskPtr & disk, const String & relative_path) const
{
return createPart(name, MergeTreePartInfo::fromPartName(name, format_version), disk, relative_path);
}
MergeTreeData::MutableDataPartPtr MergeTreeData::createPart(const String & name, const MergeTreePartInfo & part_info,
const DiskSpace::DiskPtr & disk, const String & relative_path) const
{
auto type = MergeTreeDataPartType::UNKNOWN;
auto full_path = getFullPathOnDisk(disk) + relative_path + "/";
auto mrk_ext = MergeTreeIndexGranularityInfo::getMrkExtensionFromFS(full_path);
if (mrk_ext)
type = getPartTypeFromMarkExtension(*mrk_ext);
else
/// Didn't find any mark file, suppose that part is empty.
type = choosePartType(0, 0);
MutableDataPartPtr part;
/// FIXME do not pass emty granularity_info
if (type == MergeTreeDataPartType::COMPACT)
part = std::make_shared<MergeTreeDataPartCompact>(*this, name, part_info, disk, relative_path);
else if (type == MergeTreeDataPartType::WIDE)
part = std::make_shared<MergeTreeDataPartWide>(*this, name, part_info, disk, relative_path);
else
throw Exception("Unknown part type", ErrorCodes::LOGICAL_ERROR);
return part;
}
void MergeTreeData::alterDataPart(
const NamesAndTypesList & new_columns,
const IndicesASTs & new_indices,
@ -2589,7 +2685,7 @@ MergeTreeData::DataPartPtr MergeTreeData::getPartIfExists(const String & part_na
MergeTreeData::MutableDataPartPtr MergeTreeData::loadPartAndFixMetadata(const DiskSpace::DiskPtr & disk, const String & relative_path)
{
MutableDataPartPtr part = createPart(*this, disk, Poco::Path(relative_path).getFileName(), relative_path);
MutableDataPartPtr part = createPart(Poco::Path(relative_path).getFileName(), disk, relative_path);
loadPartAndFixMetadata(part);
return part;
}
@ -3027,8 +3123,7 @@ MergeTreeData::MutableDataPartsVector MergeTreeData::tryLoadPartsToAttach(const
for (const auto & part_names : renamed_parts.old_and_new_names)
{
LOG_DEBUG(log, "Checking part " << part_names.second);
MutableDataPartPtr part = createPart(
*this, name_to_disk[part_names.first], part_names.first, source_dir + part_names.second);
MutableDataPartPtr part = createPart(part_names.first, name_to_disk[part_names.first], source_dir + part_names.second);
loadPartAndFixMetadata(part);
loaded_parts.push_back(part);
}
@ -3240,8 +3335,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeData::cloneAndLoadDataPart(const Merg
LOG_DEBUG(log, "Cloning part " << src_part_absolute_path.toString() << " to " << dst_part_absolute_path.toString());
localBackup(src_part_absolute_path, dst_part_absolute_path);
MergeTreeData::MutableDataPartPtr dst_data_part = createPart(
*this, reservation->getDisk(), dst_part_name, dst_part_info, tmp_dst_part_name);
auto dst_data_part = createPart(dst_part_name, dst_part_info, reservation->getDisk(), tmp_dst_part_name);
dst_data_part->is_temp = true;

View File

@ -178,6 +178,34 @@ public:
using DataPartsLock = std::unique_lock<std::mutex>;
DataPartsLock lockParts() const { return DataPartsLock(data_parts_mutex); }
MergeTreeDataPartType choosePartType(size_t bytes_on_disk, size_t rows_count) const;
/// FIXME remove version with columns.
MutableDataPartPtr createPart(const String & name, const MergeTreePartInfo & part_info,
const DiskSpace::DiskPtr & disk, const NamesAndTypesList & columns,
size_t bytes_on_disk, size_t rows_num, const String & relative_path) const;
MutableDataPartPtr createPart(const String & name,
const DiskSpace::DiskPtr & disk, const NamesAndTypesList & columns,
size_t bytes_on_disk, size_t rows_num, const String & relative_path) const;
/// After this methods loadColumnsChecksumsIndexes must be called
/// FIXME make this inside this function
MutableDataPartPtr createPart(const String & name,
const DiskSpace::DiskPtr & disk, const String & relative_path) const;
MutableDataPartPtr createPart(const String & name, const MergeTreePartInfo & part_info,
const DiskSpace::DiskPtr & disk, const String & relative_path) const;
MutableDataPartPtr createPart(const String & name, MergeTreeDataPartType type,
const DiskSpace::DiskPtr & disk, const String & relative_path) const;
MutableDataPartPtr createPart(const String & name, MergeTreeDataPartType type,
const MergeTreePartInfo & part_info,
const DiskSpace::DiskPtr & disk, const String & relative_path) const;
/// Auxiliary object to add a set of parts into the working set in two steps:
/// * First, as PreCommitted parts (the parts are ready, but not yet in the active set).
/// * Next, if commit() is called, the parts are added to the active set and the parts that are

View File

@ -9,7 +9,6 @@
#include <Storages/MergeTree/TTLMergeSelector.h>
#include <Storages/MergeTree/MergeList.h>
#include <Storages/MergeTree/StorageFromMergeTreeDataPart.h>
#include <Storages/MergeTree/MergeTreeDataPartFactory.h>
#include <DataStreams/TTLBlockInputStream.h>
#include <DataStreams/DistinctSortedBlockInputStream.h>
#include <DataStreams/ExpressionBlockInputStream.h>
@ -564,9 +563,20 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mergePartsToTempor
extractMergingAndGatheringColumns(
all_columns, data.sorting_key_expr, data.skip_indices,
data.merging_params, gathering_columns, gathering_column_names, merging_columns, merging_column_names);
size_t total_bytes = 0;
size_t total_rows = 0;
for (const auto & part : future_part.parts)
{
total_bytes += part->bytes_on_disk;
total_rows += part->rows_count;
}
MergeTreeData::MutableDataPartPtr new_data_part = createPart(
data, space_reservation->getDisk(), future_part.name, future_part.part_info, TMP_PREFIX + future_part.name);
MergeTreeData::MutableDataPartPtr new_data_part = data.createPart(
future_part.name, future_part.part_info,
space_reservation->getDisk(),
all_columns, total_bytes, total_rows,
TMP_PREFIX + future_part.name);
new_data_part->partition.assign(future_part.getPartition());
new_data_part->is_temp = true;
@ -950,8 +960,10 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mutatePartToTempor
else
LOG_TRACE(log, "Mutating part " << source_part->name << " to mutation version " << future_part.part_info.mutation);
MergeTreeData::MutableDataPartPtr new_data_part = createPart(
data, space_reservation->getDisk(), future_part.name, future_part.part_info, "tmp_mut_" + future_part.name);
MergeTreeData::MutableDataPartPtr new_data_part = data.createPart(
future_part.name, source_part->getType(),
future_part.part_info, space_reservation->getDisk(),
"tmp_mut_" + future_part.name);
new_data_part->is_temp = true;
new_data_part->ttl_infos = source_part->ttl_infos;
@ -1060,7 +1072,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mutatePartToTempor
NameSet files_to_skip = {"checksums.txt", "columns.txt"};
/// Don't change granularity type while mutating subset of columns
auto mrk_extension = source_part->index_granularity_info.is_adaptive ? getAdaptiveMrkExtension() : getNonAdaptiveMrkExtension();
auto mrk_extension = source_part->index_granularity_info.is_adaptive ? getAdaptiveMrkExtension(new_data_part->getType()) : getNonAdaptiveMrkExtension();
for (const auto & entry : updated_header)
{
IDataType::StreamCallback callback = [&](const IDataType::SubstreamPath & substream_path)

View File

@ -55,10 +55,9 @@ namespace ErrorCodes
MergeTreeDataPartCompact::MergeTreeDataPartCompact(
MergeTreeData & storage_,
const String & name_,
const MergeTreeIndexGranularityInfo & index_granularity_info_,
const DiskSpace::DiskPtr & disk_,
const std::optional<String> & relative_path_)
: IMergeTreeDataPart(storage_, name_, index_granularity_info_, disk_, relative_path_)
: IMergeTreeDataPart(storage_, name_, disk_, relative_path_)
{
}
@ -66,10 +65,9 @@ MergeTreeDataPartCompact::MergeTreeDataPartCompact(
const MergeTreeData & storage_,
const String & name_,
const MergeTreePartInfo & info_,
const MergeTreeIndexGranularityInfo & index_granularity_info_,
const DiskSpace::DiskPtr & disk_,
const std::optional<String> & relative_path_)
: IMergeTreeDataPart(storage_, name_, info_, index_granularity_info_, disk_, relative_path_)
: IMergeTreeDataPart(storage_, name_, info_, disk_, relative_path_)
{
}
@ -139,31 +137,34 @@ ColumnSize MergeTreeDataPartCompact::getColumnSizeImpl(
*/
String MergeTreeDataPartCompact::getColumnNameWithMinumumCompressedSize() const
{
const auto & storage_columns = storage.getColumns().getAllPhysical();
const std::string * minimum_size_column = nullptr;
UInt64 minimum_size = std::numeric_limits<UInt64>::max();
/// FIXME: save column sizes
for (const auto & column : storage_columns)
{
if (!hasColumnFiles(column.name, *column.type))
continue;
// const auto & storage_columns = storage.getColumns().getAllPhysical();
// const std::string * minimum_size_column = nullptr;
// UInt64 minimum_size = std::numeric_limits<UInt64>::max();
const auto size = getColumnSizeImpl(column.name, *column.type, nullptr).data_compressed;
if (size < minimum_size)
{
minimum_size = size;
minimum_size_column = &column.name;
}
}
// for (const auto & column : storage_columns)
// {
// if (!hasColumnFiles(column.name, *column.type))
// continue;
if (!minimum_size_column)
throw Exception("Could not find a column of minimum size in MergeTree, part " + getFullPath(), ErrorCodes::LOGICAL_ERROR);
// const auto size = getColumnSizeImpl(column.name, *column.type, nullptr).data_compressed;
// if (size < minimum_size)
// {
// minimum_size = size;
// minimum_size_column = &column.name;
// }
// }
return *minimum_size_column;
// if (!minimum_size_column)
// throw Exception("Could not find a column of minimum size in MergeTree, part " + getFullPath(), ErrorCodes::LOGICAL_ERROR);
return columns.front().name;
}
void MergeTreeDataPartCompact::loadIndexGranularity()
{
index_granularity_info.initialize(storage, getType(), columns.size());
String full_path = getFullPath();
if (columns.empty())
@ -196,105 +197,109 @@ void MergeTreeDataPartCompact::loadIndexGranularity()
void MergeTreeDataPartCompact::checkConsistency(bool require_part_metadata)
{
String path = getFullPath();
UNUSED(require_part_metadata);
/// FIXME implement for compact parts
if (!checksums.empty())
{
if (!storage.primary_key_columns.empty() && !checksums.files.count("primary.idx"))
throw Exception("No checksum for primary.idx", ErrorCodes::NO_FILE_IN_DATA_PART);
if (require_part_metadata)
{
for (const NameAndTypePair & name_type : columns)
{
IDataType::SubstreamPath stream_path;
name_type.type->enumerateStreams([&](const IDataType::SubstreamPath & substream_path)
{
String file_name = IDataType::getFileNameForStream(name_type.name, substream_path);
String mrk_file_name = file_name + index_granularity_info.marks_file_extension;
String bin_file_name = file_name + ".bin";
if (!checksums.files.count(mrk_file_name))
throw Exception("No " + mrk_file_name + " file checksum for column " + name_type.name + " in part " + path,
ErrorCodes::NO_FILE_IN_DATA_PART);
if (!checksums.files.count(bin_file_name))
throw Exception("No " + bin_file_name + " file checksum for column " + name_type.name + " in part " + path,
ErrorCodes::NO_FILE_IN_DATA_PART);
}, stream_path);
}
}
// String path = getFullPath();
if (storage.format_version >= MERGE_TREE_DATA_MIN_FORMAT_VERSION_WITH_CUSTOM_PARTITIONING)
{
if (!checksums.files.count("count.txt"))
throw Exception("No checksum for count.txt", ErrorCodes::NO_FILE_IN_DATA_PART);
// if (!checksums.empty())
// {
// if (!storage.primary_key_columns.empty() && !checksums.files.count("primary.idx"))
// throw Exception("No checksum for primary.idx", ErrorCodes::NO_FILE_IN_DATA_PART);
if (storage.partition_key_expr && !checksums.files.count("partition.dat"))
throw Exception("No checksum for partition.dat", ErrorCodes::NO_FILE_IN_DATA_PART);
// if (require_part_metadata)
// {
// for (const NameAndTypePair & name_type : columns)
// {
// IDataType::SubstreamPath stream_path;
// name_type.type->enumerateStreams([&](const IDataType::SubstreamPath & substream_path)
// {
// String file_name = IDataType::getFileNameForStream(name_type.name, substream_path);
// String mrk_file_name = file_name + index_granularity_info.marks_file_extension;
// String bin_file_name = file_name + ".bin";
// if (!checksums.files.count(mrk_file_name))
// throw Exception("No " + mrk_file_name + " file checksum for column " + name_type.name + " in part " + path,
// ErrorCodes::NO_FILE_IN_DATA_PART);
// if (!checksums.files.count(bin_file_name))
// throw Exception("No " + bin_file_name + " file checksum for column " + name_type.name + " in part " + path,
// ErrorCodes::NO_FILE_IN_DATA_PART);
// }, stream_path);
// }
// }
if (!isEmpty())
{
for (const String & col_name : storage.minmax_idx_columns)
{
if (!checksums.files.count("minmax_" + escapeForFileName(col_name) + ".idx"))
throw Exception("No minmax idx file checksum for column " + col_name, ErrorCodes::NO_FILE_IN_DATA_PART);
}
}
}
// if (storage.format_version >= MERGE_TREE_DATA_MIN_FORMAT_VERSION_WITH_CUSTOM_PARTITIONING)
// {
// if (!checksums.files.count("count.txt"))
// throw Exception("No checksum for count.txt", ErrorCodes::NO_FILE_IN_DATA_PART);
checksums.checkSizes(path);
}
else
{
auto check_file_not_empty = [&path](const String & file_path)
{
Poco::File file(file_path);
if (!file.exists() || file.getSize() == 0)
throw Exception("Part " + path + " is broken: " + file_path + " is empty", ErrorCodes::BAD_SIZE_OF_FILE_IN_DATA_PART);
return file.getSize();
};
// if (storage.partition_key_expr && !checksums.files.count("partition.dat"))
// throw Exception("No checksum for partition.dat", ErrorCodes::NO_FILE_IN_DATA_PART);
/// Check that the primary key index is not empty.
if (!storage.primary_key_columns.empty())
check_file_not_empty(path + "primary.idx");
// if (!isEmpty())
// {
// for (const String & col_name : storage.minmax_idx_columns)
// {
// if (!checksums.files.count("minmax_" + escapeForFileName(col_name) + ".idx"))
// throw Exception("No minmax idx file checksum for column " + col_name, ErrorCodes::NO_FILE_IN_DATA_PART);
// }
// }
// }
if (storage.format_version >= MERGE_TREE_DATA_MIN_FORMAT_VERSION_WITH_CUSTOM_PARTITIONING)
{
check_file_not_empty(path + "count.txt");
// checksums.checkSizes(path);
// }
// else
// {
// auto check_file_not_empty = [&path](const String & file_path)
// {
// Poco::File file(file_path);
// if (!file.exists() || file.getSize() == 0)
// throw Exception("Part " + path + " is broken: " + file_path + " is empty", ErrorCodes::BAD_SIZE_OF_FILE_IN_DATA_PART);
// return file.getSize();
// };
if (storage.partition_key_expr)
check_file_not_empty(path + "partition.dat");
// /// Check that the primary key index is not empty.
// if (!storage.primary_key_columns.empty())
// check_file_not_empty(path + "primary.idx");
for (const String & col_name : storage.minmax_idx_columns)
check_file_not_empty(path + "minmax_" + escapeForFileName(col_name) + ".idx");
}
// if (storage.format_version >= MERGE_TREE_DATA_MIN_FORMAT_VERSION_WITH_CUSTOM_PARTITIONING)
// {
// check_file_not_empty(path + "count.txt");
/// Check that all marks are nonempty and have the same size.
// if (storage.partition_key_expr)
// check_file_not_empty(path + "partition.dat");
std::optional<UInt64> marks_size;
for (const NameAndTypePair & name_type : columns)
{
name_type.type->enumerateStreams([&](const IDataType::SubstreamPath & substream_path)
{
Poco::File file(IDataType::getFileNameForStream(name_type.name, substream_path) + index_granularity_info.marks_file_extension);
// for (const String & col_name : storage.minmax_idx_columns)
// check_file_not_empty(path + "minmax_" + escapeForFileName(col_name) + ".idx");
// }
/// Missing file is Ok for case when new column was added.
if (file.exists())
{
UInt64 file_size = file.getSize();
// /// Check that all marks are nonempty and have the same size.
if (!file_size)
throw Exception("Part " + path + " is broken: " + file.path() + " is empty.",
ErrorCodes::BAD_SIZE_OF_FILE_IN_DATA_PART);
// std::optional<UInt64> marks_size;
// for (const NameAndTypePair & name_type : columns)
// {
// name_type.type->enumerateStreams([&](const IDataType::SubstreamPath & substream_path)
// {
// Poco::File file(IDataType::getFileNameForStream(name_type.name, substream_path) + index_granularity_info.marks_file_extension);
if (!marks_size)
marks_size = file_size;
else if (file_size != *marks_size)
throw Exception("Part " + path + " is broken: marks have different sizes.",
ErrorCodes::BAD_SIZE_OF_FILE_IN_DATA_PART);
}
});
}
}
// /// Missing file is Ok for case when new column was added.
// if (file.exists())
// {
// UInt64 file_size = file.getSize();
// if (!file_size)
// throw Exception("Part " + path + " is broken: " + file.path() + " is empty.",
// ErrorCodes::BAD_SIZE_OF_FILE_IN_DATA_PART);
// if (!marks_size)
// marks_size = file_size;
// else if (file_size != *marks_size)
// throw Exception("Part " + path + " is broken: marks have different sizes.",
// ErrorCodes::BAD_SIZE_OF_FILE_IN_DATA_PART);
// }
// });
// }
// }
}
MergeTreeDataPartCompact::~MergeTreeDataPartCompact()

View File

@ -38,14 +38,12 @@ public:
const MergeTreeData & storage_,
const String & name_,
const MergeTreePartInfo & info_,
const MergeTreeIndexGranularityInfo & index_granularity_info_,
const DiskSpace::DiskPtr & disk_,
const std::optional<String> & relative_path_ = {});
MergeTreeDataPartCompact(
MergeTreeData & storage_,
const String & name_,
const MergeTreeIndexGranularityInfo & index_granularity_info_,
const DiskSpace::DiskPtr & disk_,
const std::optional<String> & relative_path_ = {});

View File

@ -1,29 +0,0 @@
#include "MergeTreeDataPartFactory.h"
#include <Storages/MergeTree/MergeTreeData.h>
#include <Storages/MergeTree/MergeTreeIndexGranularityInfo.h>
namespace DB
{
std::shared_ptr<IMergeTreeDataPart> createPart(const MergeTreeData & storage, const DiskSpace::DiskPtr & disk, const String & name,
const MergeTreePartInfo & info, const String & relative_path)
{
// /// FIXME
// size_t size_of_mark = sizeof(size_t) + sizeof(size_t) * 2 * storage.getColumns().getAllPhysical().size();
// MergeTreeIndexGranularityInfo index_granularity_info(storage, ".mrk3", size_of_mark);
// return std::make_shared<MergeTreeDataPartCompact>(storage, name, info, index_granularity_info, disk, relative_path);
MergeTreeIndexGranularityInfo index_granularity_info(storage, ".mrk2", sizeof(size_t) * 3);
return std::make_shared<MergeTreeDataPartWide>(storage, name, info, index_granularity_info, disk, relative_path);
}
std::shared_ptr<IMergeTreeDataPart> createPart(MergeTreeData & storage, const DiskSpace::DiskPtr & disk,
const String & name, const String & relative_path)
{
// /// FIXME
// size_t size_of_mark = sizeof(size_t) + sizeof(size_t) * 2 * storage.getColumns().getAllPhysical().size();
// MergeTreeIndexGranularityInfo index_granularity_info(storage, ".mrk3", size_of_mark);
// return std::make_shared<MergeTreeDataPartCompact>(storage, name, index_granularity_info, disk, relative_path);
MergeTreeIndexGranularityInfo index_granularity_info(storage, ".mrk2", sizeof(size_t) * 3);
return std::make_shared<MergeTreeDataPartWide>(storage, name, index_granularity_info, disk, relative_path);
}
}

View File

@ -1,10 +0,0 @@
#include <Storages/MergeTree/IMergeTreeDataPart.h>
#include <Storages/MergeTree/MergeTreeDataPartWide.h>
#include <Storages/MergeTree/MergeTreeDataPartCompact.h>
namespace DB
{
std::shared_ptr<IMergeTreeDataPart> createPart(const MergeTreeData & storage_, const DiskSpace::DiskPtr & disk_, const String & name_, const MergeTreePartInfo & info_, const String & relative_path);
std::shared_ptr<IMergeTreeDataPart> createPart(MergeTreeData & storage_, const DiskSpace::DiskPtr & disk_, const String & name_, const String & relative_path);
}

View File

@ -56,10 +56,9 @@ namespace ErrorCodes
MergeTreeDataPartWide::MergeTreeDataPartWide(
MergeTreeData & storage_,
const String & name_,
const MergeTreeIndexGranularityInfo & index_granularity_info_,
const DiskSpace::DiskPtr & disk_,
const std::optional<String> & relative_path_)
: IMergeTreeDataPart(storage_, name_, index_granularity_info_, disk_, relative_path_)
: IMergeTreeDataPart(storage_, name_, disk_, relative_path_)
{
}
@ -67,10 +66,9 @@ MergeTreeDataPartWide::MergeTreeDataPartWide(
const MergeTreeData & storage_,
const String & name_,
const MergeTreePartInfo & info_,
const MergeTreeIndexGranularityInfo & index_granularity_info_,
const DiskSpace::DiskPtr & disk_,
const std::optional<String> & relative_path_)
: IMergeTreeDataPart(storage_, name_, info_, index_granularity_info_, disk_, relative_path_)
: IMergeTreeDataPart(storage_, name_, info_, disk_, relative_path_)
{
}
@ -162,6 +160,7 @@ String MergeTreeDataPartWide::getColumnNameWithMinumumCompressedSize() const
void MergeTreeDataPartWide::loadIndexGranularity()
{
index_granularity_info.initialize(storage, getType(), columns.size());
String full_path = getFullPath();
index_granularity_info.changeGranularityIfRequired(full_path);

View File

@ -38,14 +38,12 @@ public:
const MergeTreeData & storage_,
const String & name_,
const MergeTreePartInfo & info_,
const MergeTreeIndexGranularityInfo & index_granularity_info_,
const DiskSpace::DiskPtr & disk,
const std::optional<String> & relative_path = {});
MergeTreeDataPartWide(
MergeTreeData & storage_,
const String & name_,
const MergeTreeIndexGranularityInfo & index_granularity_info_,
const DiskSpace::DiskPtr & disk,
const std::optional<String> & relative_path = {});

View File

@ -1,6 +1,5 @@
#include <Storages/MergeTree/MergeTreeDataWriter.h>
#include <Storages/MergeTree/MergedBlockOutputStream.h>
#include <Storages/MergeTree/MergeTreeDataPartFactory.h>
#include <Common/HashTable/HashMap.h>
#include <Common/Exception.h>
#include <Interpreters/AggregationCommon.h>
@ -205,8 +204,11 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataWriter::writeTempPart(BlockWithPa
auto reservation = data.reserveSpace(expected_size);
MergeTreeData::MutableDataPartPtr new_data_part =
createPart(data, reservation->getDisk(), part_name, new_part_info, TMP_PREFIX + part_name);
MergeTreeData::MutableDataPartPtr new_data_part = data.createPart(
part_name, new_part_info,
reservation->getDisk(), block.getNamesAndTypesList(),
expected_size, block.rows(),
TMP_PREFIX + part_name);
new_data_part->partition = std::move(partition);
new_data_part->minmax_idx = std::move(minmax_idx);

View File

@ -7,14 +7,7 @@
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
}
std::optional<std::string> MergeTreeIndexGranularityInfo::getMrkExtensionFromFS(const std::string & path_to_part) const
std::optional<std::string> MergeTreeIndexGranularityInfo::getMrkExtensionFromFS(const std::string & path_to_part)
{
if (Poco::File(path_to_part).exists())
{
@ -22,44 +15,62 @@ std::optional<std::string> MergeTreeIndexGranularityInfo::getMrkExtensionFromFS(
for (Poco::DirectoryIterator part_it(path_to_part); part_it != end; ++part_it)
{
const auto & ext = "." + part_it.path().getExtension();
if (ext == getNonAdaptiveMrkExtension() || ext == getAdaptiveMrkExtension())
if (ext == getNonAdaptiveMrkExtension()
|| ext == getAdaptiveMrkExtension(MergeTreeDataPartType::WIDE)
|| ext == getAdaptiveMrkExtension(MergeTreeDataPartType::COMPACT))
return ext;
}
}
return {};
}
MergeTreeIndexGranularityInfo::MergeTreeIndexGranularityInfo(const MergeTreeData & storage,
const String & marks_file_extension_, UInt8 mark_size_in_bytes_)
: marks_file_extension(marks_file_extension_), mark_size_in_bytes(mark_size_in_bytes_)
MergeTreeIndexGranularityInfo::MergeTreeIndexGranularityInfo(
const MergeTreeData & storage, MergeTreeDataPartType part_type, size_t columns_num)
{
initialize(storage, part_type, columns_num);
}
void MergeTreeIndexGranularityInfo::initialize(const MergeTreeData & storage, MergeTreeDataPartType part_type, size_t columns_num)
{
if (initialized)
return;
const auto storage_settings = storage.getSettings();
fixed_index_granularity = storage_settings->index_granularity;
/// Granularity is fixed
if (!storage.canUseAdaptiveGranularity())
{
if (part_type != MergeTreeDataPartType::WIDE)
throw ""; /// FIXME normal exception
setNonAdaptive();
}
else
setAdaptive(storage_settings->index_granularity_bytes);
setAdaptive(storage_settings->index_granularity_bytes, part_type, columns_num);
}
void MergeTreeIndexGranularityInfo::changeGranularityIfRequired(const String & path)
void MergeTreeIndexGranularityInfo::changeGranularityIfRequired(const std::string & path_to_part)
{
auto mrk_ext = getMrkExtensionFromFS(path);
if (mrk_ext && *mrk_ext == ".mrk") /// TODO
/// FIXME check when we cant create compact part
auto mrk_ext = getMrkExtensionFromFS(path_to_part);
if (mrk_ext && *mrk_ext == getNonAdaptiveMrkExtension())
setNonAdaptive();
}
void MergeTreeIndexGranularityInfo::setAdaptive(size_t index_granularity_bytes_)
void MergeTreeIndexGranularityInfo::setAdaptive(size_t index_granularity_bytes_, MergeTreeDataPartType part_type, size_t columns_num)
{
is_adaptive = true;
mark_size_in_bytes = getAdaptiveMrkSize(part_type, columns_num);
marks_file_extension = getAdaptiveMrkExtension(part_type);
index_granularity_bytes = index_granularity_bytes_;
}
void MergeTreeIndexGranularityInfo::setNonAdaptive()
{
is_adaptive = false;
mark_size_in_bytes = getNonAdaptiveMrkSize();
marks_file_extension = getNonAdaptiveMrkExtension();
index_granularity_bytes = 0;
}

View File

@ -2,54 +2,83 @@
#include <optional>
#include <Core/Types.h>
// #include <Storages/MergeTree/IMergeTreeDataPart.h>
#include <Storages/MergeTree/IMergeTreeDataPart_fwd.h>
namespace DB
{
class MergeTreeData;
class IMergeTreeDataPart;
/// Meta information about index granularity
struct MergeTreeIndexGranularityInfo
{
public:
using MergeTreeDataPartPtr = std::shared_ptr<IMergeTreeDataPart>;
/// Marks file extension '.mrk' or '.mrk2'
String marks_file_extension;
/// Size of one mark in file two or three size_t numbers
UInt8 mark_size_in_bytes;
UInt16 mark_size_in_bytes = 0;
/// Is stride in rows between marks non fixed?
bool is_adaptive;
bool is_adaptive = false;
/// Fixed size in rows of one granule if index_granularity_bytes is zero
size_t fixed_index_granularity;
size_t fixed_index_granularity = 0;
/// Approximate bytes size of one granule
size_t index_granularity_bytes;
size_t index_granularity_bytes = 0;
MergeTreeIndexGranularityInfo(const MergeTreeData & storage,
const String & mark_file_extension_, UInt8 mark_size_in_bytes_);
bool initialized = false;
void changeGranularityIfRequired(const String & path);
MergeTreeIndexGranularityInfo() {}
MergeTreeIndexGranularityInfo(
const MergeTreeData & storage, MergeTreeDataPartType part_type, size_t columns_num);
void initialize(const MergeTreeData & storage, MergeTreeDataPartType part_type, size_t columns_num);
void changeGranularityIfRequired(const std::string & path_to_part);
String getMarksFilePath(const String & path_prefix) const
{
return path_prefix + marks_file_extension;
}
private:
void setAdaptive(size_t index_granularity_bytes_);
static std::optional<std::string> getMrkExtensionFromFS(const std::string & path_to_table);
private:
void setAdaptive(size_t index_granularity_bytes_, MergeTreeDataPartType part_type, size_t columns_num);
void setNonAdaptive();
std::optional<std::string> getMrkExtensionFromFS(const std::string & path_to_table) const;
void setCompactAdaptive(size_t index_granularity_bytes_, size_t columns_num);
};
constexpr inline auto getNonAdaptiveMrkExtension() { return ".mrk"; }
constexpr inline auto getAdaptiveMrkExtension() { return ".mrk2"; }
constexpr inline auto getNonAdaptiveMrkSize() { return sizeof(UInt64) * 2; }
constexpr inline auto getAdaptiveMrkSize() { return sizeof(UInt64) * 3; }
inline std::string getAdaptiveMrkExtension(MergeTreeDataPartType part_type)
{
switch(part_type)
{
case MergeTreeDataPartType::WIDE:
return ".mrk2";
case MergeTreeDataPartType::COMPACT:
return ".mrk3";
default:
throw "unknown part type"; /// FIXME normal exception
}
}
inline size_t getAdaptiveMrkSize(MergeTreeDataPartType part_type, size_t columns_num)
{
switch(part_type)
{
case MergeTreeDataPartType::WIDE:
return sizeof(UInt64) * 3;
case MergeTreeDataPartType::COMPACT:
return sizeof(UInt64) * (columns_num * 2 + 1);
default:
throw "unknown part type"; /// FIXME normal exception
}
}
}

View File

@ -20,7 +20,7 @@ const MarkInCompressedFile & MergeTreeMarksLoader::getMark(size_t row_index, siz
if (!marks)
loadMarks();
if (column_index >= columns_num)
throw Exception("", ErrorCodes::LOGICAL_ERROR);
throw Exception("", ErrorCodes::LOGICAL_ERROR); /// FIXME better exception
return (*marks)[row_index * columns_num + column_index];
}

View File

@ -1,6 +1,5 @@
#include <Storages/MergeTree/MergeTreePartsMover.h>
#include <Storages/MergeTree/MergeTreeData.h>
#include <Storages/MergeTree/MergeTreeDataPartFactory.h>
#include <set>
#include <boost/algorithm/string/join.hpp>
@ -145,7 +144,7 @@ MergeTreeData::DataPartPtr MergeTreePartsMover::clonePart(const MergeTreeMoveEnt
moving_part.part->makeCloneOnDiskDetached(moving_part.reserved_space);
MergeTreeData::MutableDataPartPtr cloned_part =
createPart(*data, moving_part.reserved_space->getDisk(), moving_part.part->name, "detached/" + moving_part.part->name);
data->createPart(moving_part.part->name, moving_part.reserved_space->getDisk(), "detached/" + moving_part.part->name);
LOG_TRACE(log, "Part " << moving_part.part->name << " was cloned to " << cloned_part->getFullPath());
cloned_part->loadColumnsChecksumsIndexes(true, true);

View File

@ -124,6 +124,9 @@ void MergeTreeReaderCompact::readData(
void MergeTreeReaderCompact::initMarksLoader()
{
if (marks_loader.initialized())
return;
const auto & index_granularity_info = data_part->index_granularity_info;
size_t marks_count = data_part->getMarksCount();
std::string mrk_path = index_granularity_info.getMarksFilePath(path + NAME_OF_FILE_WITH_DATA);

View File

@ -25,9 +25,14 @@ class ASTStorage;
struct MergeTreeSettings : public SettingsCollection<MergeTreeSettings>
{
/// FIXME description for settings
#define LIST_OF_MERGE_TREE_SETTINGS(M) \
M(SettingUInt64, index_granularity, 8192, "How many rows correspond to one primary key value.") \
\
/** Data storing format settigns. */ \
M(SettingUInt64, min_bytes_for_wide_part, 0, "") \
M(SettingUInt64, min_rows_for_wide_part, 0, "") \
\
/** Merge settings. */ \
M(SettingUInt64, max_bytes_to_merge_at_max_space_in_pool, 150ULL * 1024 * 1024 * 1024, "Maximum in total size of parts to merge, when there are maximum free threads in background pool (or entries in replication queue).") \
M(SettingUInt64, max_bytes_to_merge_at_min_space_in_pool, 1024 * 1024, "Maximum in total size of parts to merge, when there are minimum free threads in background pool (or entries in replication queue).") \