Large refactoring (only compilable)

This commit is contained in:
alesapin 2019-03-25 16:55:24 +03:00
parent 673a72eac6
commit aba51a11ba
30 changed files with 414 additions and 223 deletions

View File

@ -0,0 +1,93 @@
#include <Storages/MergeTree/IndexGranularity.h>
#include <Common/Exception.h>
#include <IO/WriteHelpers.h>
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
}
IndexGranularity::IndexGranularity(const std::vector<size_t> & marks_to_rows_)
: marks_to_rows(marks_to_rows_)
{
for (size_t rows_count : marks_to_rows)
total_rows += rows_count;
}
IndexGranularity::IndexGranularity(size_t marks_count, size_t fixed_granularity)
: marks_to_rows(marks_count, fixed_granularity)
, total_rows(marks_count * fixed_granularity)
{
}
size_t IndexGranularity::getAvgGranularity() const
{
if (marks_to_rows.empty())
throw Exception("Trying to compute average index granularity with zero marks", ErrorCodes::LOGICAL_ERROR);
return total_rows / marks_to_rows.size();
}
size_t IndexGranularity::getMarkStartingRow(size_t mark_index) const
{
size_t result = 0;
for (size_t i = 0; i < mark_index; ++i)
result += marks_to_rows[i];
return result;
}
size_t IndexGranularity::getMarksCount() const
{
return marks_to_rows.size();
}
size_t IndexGranularity::getTotalRows() const
{
return total_rows;
}
void IndexGranularity::appendMark(size_t rows_count)
{
total_rows += rows_count;
marks_to_rows.push_back(rows_count);
}
size_t IndexGranularity::getMarkRows(size_t mark_index) const
{
if (mark_index >= marks_to_rows.size())
throw Exception("Trying to get mark rows for mark " + toString(mark_index) + " while marks count is " + toString(marks_to_rows.size()), ErrorCodes::LOGICAL_ERROR);
return marks_to_rows[mark_index];
}
size_t IndexGranularity::getRowsCountInRange(const MarkRange & range) const
{
size_t rows_count = 0;
for (size_t i = range.begin; i < range.end; ++i)
rows_count += getMarkRows(i);
return rows_count;
}
size_t IndexGranularity::getRowsCountInRanges(const MarkRanges & ranges) const
{
size_t total = 0;
for (const auto & range : ranges)
total += getRowsCountInRange(range);
return total;
}
void IndexGranularity::resizeWithFixedGranularity(size_t size, size_t fixed_granularity)
{
marks_to_rows.resize(size, fixed_granularity);
total_rows += size * fixed_granularity;
}
}

View File

@ -0,0 +1,42 @@
#pragma once
#include <vector>
#include <Storages/MergeTree/MarkRange.h>
namespace DB
{
class IndexGranularity
{
private:
std::vector<size_t> marks_to_rows;
size_t total_rows = 0;
public:
IndexGranularity() = default;
explicit IndexGranularity(const std::vector<size_t> & marks_to_rows_);
IndexGranularity(size_t marks_count, size_t fixed_granularity);
size_t getRowsCountInRange(const MarkRange & range) const;
size_t getRowsCountInRanges(const MarkRanges & ranges) const;
size_t getAvgGranularity() const;
size_t getMarksCount() const;
size_t getTotalRows() const;
size_t getMarkRows(size_t mark_index) const;
size_t getMarkStartingRow(size_t mark_index) const;
size_t getLastMarkRows() const
{
return marks_to_rows.back();
}
bool empty() const
{
return marks_to_rows.empty();
}
void appendMark(size_t rows_count);
void resizeWithFixedGranularity(size_t size, size_t fixed_granularity);
};
}

View File

@ -28,7 +28,7 @@ MergeListElement::MergeListElement(const std::string & database, const std::stri
std::shared_lock<std::shared_mutex> part_lock(source_part->columns_lock);
total_size_bytes_compressed += source_part->bytes_on_disk;
total_size_marks += source_part->marks_count;
total_size_marks += source_part->getMarksCount();
}
if (!future_part.parts.empty())

View File

@ -41,8 +41,7 @@ MergeTreeBaseSelectBlockInputStream::MergeTreeBaseSelectBlockInputStream(
max_read_buffer_size(max_read_buffer_size),
use_uncompressed_cache(use_uncompressed_cache),
save_marks_in_cache(save_marks_in_cache),
virt_column_names(virt_column_names),
max_block_size_marks(max_block_size_rows / storage.index_granularity)
virt_column_names(virt_column_names)
{
}
@ -77,22 +76,22 @@ Block MergeTreeBaseSelectBlockInputStream::readFromPart()
const auto current_max_block_size_rows = max_block_size_rows;
const auto current_preferred_block_size_bytes = preferred_block_size_bytes;
const auto current_preferred_max_column_in_block_size_bytes = preferred_max_column_in_block_size_bytes;
const auto index_granularity = storage.index_granularity;
const auto avg_index_granularity = task->data_part->index_granularity.getAvgGranularity();
const double min_filtration_ratio = 0.00001;
auto estimateNumRows = [current_preferred_block_size_bytes, current_max_block_size_rows,
index_granularity, current_preferred_max_column_in_block_size_bytes, min_filtration_ratio](
avg_index_granularity, current_preferred_max_column_in_block_size_bytes, min_filtration_ratio](
MergeTreeReadTask & current_task, MergeTreeRangeReader & current_reader)
{
if (!current_task.size_predictor)
return current_max_block_size_rows;
/// Calculates number of rows will be read using preferred_block_size_bytes.
/// Can't be less than index_granularity.
/// Can't be less than avg_index_granularity.
UInt64 rows_to_read = current_task.size_predictor->estimateNumRows(current_preferred_block_size_bytes);
if (!rows_to_read)
return rows_to_read;
rows_to_read = std::max<UInt64>(index_granularity, rows_to_read);
rows_to_read = std::max<UInt64>(avg_index_granularity, rows_to_read);
if (current_preferred_max_column_in_block_size_bytes)
{
@ -103,7 +102,7 @@ Block MergeTreeBaseSelectBlockInputStream::readFromPart()
auto rows_to_read_for_max_size_column_with_filtration
= static_cast<UInt64>(rows_to_read_for_max_size_column / filtration_ratio);
/// If preferred_max_column_in_block_size_bytes is used, number of rows to read can be less than index_granularity.
/// If preferred_max_column_in_block_size_bytes is used, number of rows to read can be less than avg_index_granularity.
rows_to_read = std::min(rows_to_read, rows_to_read_for_max_size_column_with_filtration);
}
@ -111,8 +110,8 @@ Block MergeTreeBaseSelectBlockInputStream::readFromPart()
if (unread_rows_in_current_granule >= rows_to_read)
return rows_to_read;
UInt64 granule_to_read = (rows_to_read + current_reader.numReadRowsInCurrentGranule() + index_granularity / 2) / index_granularity;
return index_granularity * granule_to_read - current_reader.numReadRowsInCurrentGranule();
UInt64 granule_to_read = (rows_to_read + current_reader.numReadRowsInCurrentGranule() + avg_index_granularity / 2) / avg_index_granularity;
return avg_index_granularity * granule_to_read - current_reader.numReadRowsInCurrentGranule();
};
//if (reader == nullptr) {

View File

@ -71,8 +71,6 @@ protected:
using MergeTreeReaderPtr = std::unique_ptr<MergeTreeReader>;
MergeTreeReaderPtr reader;
MergeTreeReaderPtr pre_reader;
UInt64 max_block_size_marks;
};
}

View File

@ -104,8 +104,7 @@ MergeTreeData::MergeTreeData(
bool attach,
BrokenPartCallback broken_part_callback_)
: global_context(context_),
index_granularity(settings_.index_granularity),
index_granularity_bytes(settings_.index_granularity_bytes),
index_granularity_info(settings_),
merging_params(merging_params_),
settings(settings_),
partition_by_ast(partition_by_ast_),
@ -198,6 +197,26 @@ MergeTreeData::MergeTreeData(
}
MergeTreeData::IndexGranularityInfo::IndexGranularityInfo(const MergeTreeSettings & settings)
: fixed_index_granularity(settings.index_granularity)
, index_granularity_bytes(settings.index_granularity_bytes)
{
/// Granularity is fixed
if (index_granularity_bytes == 0)
{
is_adaptive = false;
mark_size_in_bytes = sizeof(UInt64) * 2;
marks_file_extension = ".mrk";
}
else
{
is_adaptive = true;
mark_size_in_bytes = sizeof(UInt64) * 3;
marks_file_extension = ".mrk2";
}
}
static void checkKeyExpression(const ExpressionActions & expr, const Block & sample_block, const String & key_name)
{
for (const ExpressionAction & action : expr.getActions())
@ -1186,7 +1205,7 @@ void MergeTreeData::createConvertExpression(const DataPartPtr & part, const Name
if (!new_indices_set.count(index.name))
{
out_rename_map["skp_idx_" + index.name + ".idx"] = "";
out_rename_map["skp_idx_" + index.name + part->marks_file_extension] = ""; //TODO(alesap) How to deal with it?
out_rename_map["skp_idx_" + index.name + index_granularity_info.marks_file_extension] = "";
}
}
}
@ -1216,7 +1235,7 @@ void MergeTreeData::createConvertExpression(const DataPartPtr & part, const Name
if (--stream_counts[file_name] == 0)
{
out_rename_map[file_name + ".bin"] = "";
out_rename_map[file_name + part->marks_file_extension] = "";
out_rename_map[file_name + index_granularity_info.marks_file_extension] = "";
}
}, {});
}
@ -1295,7 +1314,7 @@ void MergeTreeData::createConvertExpression(const DataPartPtr & part, const Name
//std::cerr << "PART IS NULL:" << (part == nullptr) << std::endl;
//std::cerr << "PART MARKS FILE_EXTENSION:" << part->marks_file_extension << std::endl;
out_rename_map[temporary_file_name + ".bin"] = original_file_name + ".bin";
out_rename_map[temporary_file_name + part->marks_file_extension] = original_file_name + part->marks_file_extension;
out_rename_map[temporary_file_name + index_granularity_info.marks_file_extension] = original_file_name + index_granularity_info.marks_file_extension;
}, {});
}
}
@ -1422,7 +1441,7 @@ MergeTreeData::AlterDataPartTransactionPtr MergeTreeData::alterDataPart(
compression_codec,
true /* skip_offsets */,
unused_written_offsets,
part->marks_index_granularity);
part->index_granularity);
in.readPrefix();
out.writePrefix();

View File

@ -285,6 +285,32 @@ public:
String getModeName() const;
};
/// Meta information about index granularity
struct IndexGranularityInfo
{
/// Marks file extension '.mrk' or '.mrk2'
String marks_file_extension;
/// Size of one mark in file two or three size_t numbers
UInt8 mark_size_in_bytes;
/// Is stride in rows between marks non fixed?
bool is_adaptive;
/// Approximate bytes size of one granule
size_t index_granularity_bytes;
/// Fixed size in rows of one granule if index_granularity_bytes is zero
size_t fixed_index_granularity;
IndexGranularityInfo(const MergeTreeSettings & settings);
String getMarksFilePath(const String & column_path) const
{
return column_path + marks_file_extension;
}
};
/// Attach the table corresponding to the directory in full_path (must end with /), with the given columns.
/// Correctness of names and paths is not checked.
@ -572,8 +598,7 @@ public:
MergeTreeDataFormatVersion format_version;
Context global_context;
const size_t index_granularity;
const size_t index_granularity_bytes;
IndexGranularityInfo index_granularity_info;
/// Merging params - what additional actions to perform during merge.
const MergingParams merging_params;

View File

@ -536,7 +536,8 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mergePartsToTempor
new_data_part->relative_path = TMP_PREFIX + future_part.name;
new_data_part->is_temp = true;
size_t sum_input_rows_upper_bound = merge_entry->total_size_marks * data.index_granularity;
/// TODO(alesap) fixme to sum of all index_granularity
size_t sum_input_rows_upper_bound = merge_entry->total_size_marks * data.index_granularity_info.fixed_index_granularity;
MergeAlgorithm merge_alg = chooseMergeAlgorithm(parts, sum_input_rows_upper_bound, gathering_columns, deduplicate);
@ -955,7 +956,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mutatePartToTempor
{
String stream_name = IDataType::getFileNameForStream(entry.name, substream_path);
files_to_skip.insert(stream_name + ".bin");
files_to_skip.insert(stream_name + new_data_part->marks_file_extension);
files_to_skip.insert(stream_name + data.index_granularity_info.marks_file_extension);
};
IDataType::SubstreamPath stream_path;
@ -985,7 +986,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mutatePartToTempor
compression_codec,
/* skip_offsets = */ false,
unused_written_offsets,
source_part->marks_index_granularity
source_part->index_granularity
);
in->readPrefix();
@ -1029,9 +1030,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mutatePartToTempor
}
new_data_part->rows_count = source_part->rows_count;
new_data_part->marks_count = source_part->marks_count;
new_data_part->marks_index_granularity = source_part->marks_index_granularity;
new_data_part->mark_size_in_bytes = source_part->mark_size_in_bytes;
new_data_part->index_granularity = source_part->index_granularity;
new_data_part->index = source_part->index;
new_data_part->partition.assign(source_part->partition);
new_data_part->minmax_idx = source_part->minmax_idx;

View File

@ -137,7 +137,6 @@ MergeTreeDataPart::MergeTreeDataPart(MergeTreeData & storage_, const String & na
: storage(storage_)
, name(name_)
, info(MergeTreePartInfo::fromPartName(name_, storage.format_version))
, marks_file_extension(storage.index_granularity_bytes > 0 ? ".mrk2" : ".mrk")
{
}
@ -145,7 +144,6 @@ MergeTreeDataPart::MergeTreeDataPart(const MergeTreeData & storage_, const Strin
: storage(storage_)
, name(name_)
, info(info_)
, marks_file_extension(storage.index_granularity_bytes > 0 ? ".mrk2" : ".mrk")
{
}
@ -173,7 +171,7 @@ MergeTreeDataPart::ColumnSize MergeTreeDataPart::getColumnSizeImpl(
size.data_uncompressed += bin_checksum->second.uncompressed_size;
}
auto mrk_checksum = checksums.files.find(file_name + marks_file_extension);
auto mrk_checksum = checksums.files.find(file_name + storage.index_granularity_info.marks_file_extension);
if (mrk_checksum != checksums.files.end())
size.marks += mrk_checksum->second.file_size;
}, {});
@ -473,6 +471,11 @@ void MergeTreeDataPart::renameToDetached(const String & prefix) const
}
UInt64 MergeTreeDataPart::getMarksCount() const
{
return index_granularity.getMarksCount();
}
void MergeTreeDataPart::makeCloneInDetached(const String & prefix) const
{
Poco::Path src(getFullPath());
@ -490,63 +493,58 @@ void MergeTreeDataPart::loadColumnsChecksumsIndexes(bool require_columns_checksu
loadColumns(require_columns_checksums);
loadChecksums(require_columns_checksums);
loadMarksIndexGranularity();
loadIndex(); /// Must be called after loadMarksIndexGranularity as it uses the value of `marks_count`
loadRowsCount(); /// Must be called after loadIndex() as it uses the value of `marks_count`.
loadIndexGranularity();
loadIndex(); /// Must be called after loadIndexGranularity as it uses the value of `index_granularity`
loadRowsCount(); /// Must be called after loadIndex() as it uses the value of `index_granularity`.
loadPartitionAndMinMaxIndex();
if (check_consistency)
checkConsistency(require_columns_checksums);
}
void MergeTreeDataPart::loadMarksIndexGranularity()
void MergeTreeDataPart::loadIndexGranularity()
{
//std::cerr << "LOADING MARKS FOR PART:" << getFullPath() << std::endl;
if (columns.empty())
throw Exception("No columns in part " + name, ErrorCodes::NO_FILE_IN_DATA_PART);
const auto & granularity_info = storage.index_granularity_info;
/// We can use any column, it doesn't matter
std::string marks_file_path = getFullPath() + escapeForFileName(columns.front().name);
//std::cerr << "MARKSFILEPATH:" << marks_file_path << std::endl;
if (!Poco::File(marks_file_path + marks_file_extension).exists())
throw Exception("Marks file '" + marks_file_path + "' doesn't exist with extension " + marks_file_extension, ErrorCodes::NO_FILE_IN_DATA_PART);
std::string marks_file_path = granularity_info.getMarksFilePath(getFullPath() + escapeForFileName(columns.front().name));
if (!Poco::File(marks_file_path).exists())
throw Exception("Marks file '" + marks_file_path + "' doesn't exist", ErrorCodes::NO_FILE_IN_DATA_PART);
marks_file_path += marks_file_extension;
size_t marks_file_size = Poco::File(marks_file_path).getSize();
/// old version of marks with static index granularity
if (marks_file_extension == ".mrk")
if (granularity_info.is_adaptive)
{
mark_size_in_bytes = sizeof(size_t) * 2;
std::cerr << "(1)SET MARKS SIZE FOR:" << marks_file_path << "TO:" << mark_size_in_bytes << std::endl;
std::cerr << "(1)SET MARKS SIZE FOR:" << marks_file_path << " TO:" << granularity_info.mark_size_in_bytes << std::endl;
/// TODO(alesap) Replace hardcoded numbers to something better
marks_count = marks_file_size / mark_size_in_bytes;
marks_index_granularity.resize(marks_count, storage.index_granularity); /// all the same
rows_approx = marks_count * storage.index_granularity;
size_t marks_count = marks_file_size / granularity_info.mark_size_in_bytes;
std::cerr << "Marks file size:" << marks_file_size << " Marks count:" << marks_count << std::endl;
index_granularity.resizeWithFixedGranularity(marks_count, granularity_info.fixed_index_granularity); /// all the same
}
else
{
mark_size_in_bytes = sizeof(size_t) * 3;
std::cerr << "(2)SET MARKS SIZE FOR:" << marks_file_path << "TO:" << mark_size_in_bytes<< std::endl;
marks_count = marks_file_size / mark_size_in_bytes;
std::cerr << "(2)SET MARKS SIZE FOR:" << marks_file_path << " TO:" << granularity_info.mark_size_in_bytes << std::endl;
size_t marks_count = marks_file_size / granularity_info.mark_size_in_bytes;
ReadBufferFromFile buffer(marks_file_path, marks_file_size, -1);
marks_index_granularity.resize(marks_count);
size_t i = 0;
while (!buffer.eof())
{
buffer.seek(sizeof(size_t) * 2, SEEK_CUR); /// skip offset_in_compressed file and offset_in_decompressed_block
readIntBinary(marks_index_granularity[i], buffer);
rows_approx += marks_index_granularity[i];
i++;
size_t granularity;
readIntBinary(granularity, buffer);
index_granularity.appendMark(granularity);
}
if (i * mark_size_in_bytes != marks_file_size)
if (index_granularity.getMarksCount() * granularity_info.mark_size_in_bytes != marks_file_size)
throw Exception("Cannot read all marks from file " + marks_file_path, ErrorCodes::CANNOT_READ_ALL_DATA);
}
}
void MergeTreeDataPart::loadIndex()
{
if (!marks_count)
throw Exception("Marks count is not calculated before index", ErrorCodes::LOGICAL_ERROR);
if (!index_granularity.empty())
throw Exception("Index granularity is not loaded before index loading", ErrorCodes::LOGICAL_ERROR);
size_t key_size = storage.primary_key_columns.size();
@ -558,22 +556,22 @@ void MergeTreeDataPart::loadIndex()
for (size_t i = 0; i < key_size; ++i)
{
loaded_index[i] = storage.primary_key_data_types[i]->createColumn();
loaded_index[i]->reserve(marks_count);
loaded_index[i]->reserve(index_granularity.getMarksCount());
}
String index_path = getFullPath() + "primary.idx";
ReadBufferFromFile index_file = openForReading(index_path);
for (size_t i = 0; i < marks_count; ++i) //-V756
for (size_t i = 0; i < index_granularity.getMarksCount(); ++i) //-V756
for (size_t j = 0; j < key_size; ++j)
storage.primary_key_data_types[j]->deserializeBinary(*loaded_index[j], index_file);
for (size_t i = 0; i < key_size; ++i)
{
loaded_index[i]->protect();
if (loaded_index[i]->size() != marks_count)
if (loaded_index[i]->size() != index_granularity.getMarksCount())
throw Exception("Cannot read all data from index file " + index_path
+ "(expected size: " + toString(marks_count) + ", read: " + toString(loaded_index[i]->size()) + ")",
+ "(expected size: " + toString(index_granularity.getMarksCount()) + ", read: " + toString(loaded_index[i]->size()) + ")",
ErrorCodes::CANNOT_READ_ALL_DATA);
}
@ -638,7 +636,7 @@ void MergeTreeDataPart::loadChecksums(bool require)
void MergeTreeDataPart::loadRowsCount()
{
if (marks_count == 0)
if (index_granularity.empty())
{
rows_count = 0;
}
@ -675,7 +673,8 @@ void MergeTreeDataPart::loadRowsCount()
ErrorCodes::LOGICAL_ERROR);
}
size_t last_mark_index_granularity = marks_index_granularity.back();
size_t last_mark_index_granularity = index_granularity.getLastMarkRows();
size_t rows_approx = index_granularity.getTotalRows();
if (!(rows_count <= rows_approx && rows_approx < rows_count + last_mark_index_granularity))
throw Exception(
"Unexpected size of column " + column.name + ": " + toString(rows_count) + " rows, expected "
@ -751,7 +750,7 @@ void MergeTreeDataPart::checkConsistency(bool require_part_metadata)
name_type.type->enumerateStreams([&](const IDataType::SubstreamPath & substream_path)
{
String file_name = IDataType::getFileNameForStream(name_type.name, substream_path);
String mrk_file_name = file_name + marks_file_extension;
String mrk_file_name = file_name + storage.index_granularity_info.marks_file_extension;
String bin_file_name = file_name + ".bin";
if (!checksums.files.count(mrk_file_name))
throw Exception("No " + mrk_file_name + " file checksum for column " + name_type.name + " in part " + path,
@ -815,7 +814,7 @@ void MergeTreeDataPart::checkConsistency(bool require_part_metadata)
{
name_type.type->enumerateStreams([&](const IDataType::SubstreamPath & substream_path)
{
Poco::File file(IDataType::getFileNameForStream(name_type.name, substream_path) + marks_file_extension);
Poco::File file(IDataType::getFileNameForStream(name_type.name, substream_path) + storage.index_granularity_info.marks_file_extension);
/// Missing file is Ok for case when new column was added.
if (file.exists())
@ -848,9 +847,9 @@ bool MergeTreeDataPart::hasColumnFiles(const String & column) const
String escaped_column = escapeForFileName(column);
std::cerr << "Escaped name:" << escaped_column << std::endl;
std::cerr << "Marks file extension:" << marks_file_extension << std::endl;
std::cerr << "Marks file extension:" << storage.index_granularity_info.marks_file_extension << std::endl;
return Poco::File(prefix + escaped_column + ".bin").exists()
&& Poco::File(prefix + escaped_column + marks_file_extension).exists();
&& Poco::File(prefix + escaped_column + storage.index_granularity_info.marks_file_extension).exists();
}

View File

@ -4,6 +4,7 @@
#include <Core/Block.h>
#include <Core/Types.h>
#include <Core/NamesAndTypes.h>
#include <Storages/MergeTree/IndexGranularity.h>
#include <Storages/MergeTree/MergeTreeIndices.h>
#include <Storages/MergeTree/MergeTreePartInfo.h>
#include <Storages/MergeTree/MergeTreePartition.h>
@ -91,7 +92,6 @@ struct MergeTreeDataPart
mutable String relative_path;
size_t rows_count = 0;
size_t marks_count = 0;
std::atomic<UInt64> bytes_on_disk {0}; /// 0 - if not counted;
/// Is used from several threads without locks (it is changed with ALTER).
/// May not contain size of checksums.txt and columns.txt
@ -180,12 +180,7 @@ struct MergeTreeDataPart
/// Amount of rows between marks
/// As index always loaded into memory
using MarksIndexGranularity = std::vector<size_t>;
MarksIndexGranularity marks_index_granularity;
size_t rows_approx = 0;
std::string marks_file_extension;
size_t mark_size_in_bytes;
IndexGranularity index_granularity;
/// Index that for each part stores min and max values of a set of columns. This allows quickly excluding
/// parts based on conditions on these columns imposed by a query.
@ -272,6 +267,7 @@ struct MergeTreeDataPart
/// For data in RAM ('index')
UInt64 getIndexSizeInBytes() const;
UInt64 getIndexSizeInAllocatedBytes() const;
UInt64 getMarksCount() const;
private:
/// Reads columns names and types from columns.txt
@ -281,7 +277,7 @@ private:
void loadChecksums(bool require);
/// Loads marks index granularity into memory
void loadMarksIndexGranularity();
void loadIndexGranularity();
/// Loads index file.
void loadIndex();

View File

@ -93,7 +93,7 @@ static Block getBlockWithPartColumn(const MergeTreeData::DataPartsVector & parts
size_t MergeTreeDataSelectExecutor::getApproximateTotalRowsToRead(
const MergeTreeData::DataPartsVector & parts, const KeyCondition & key_condition, const Settings & settings) const
{
size_t full_marks_count = 0;
size_t rows_count = 0;
/// We will find out how many rows we would have read without sampling.
LOG_DEBUG(log, "Preliminary index scan with condition: " << key_condition.toString());
@ -101,7 +101,7 @@ size_t MergeTreeDataSelectExecutor::getApproximateTotalRowsToRead(
for (size_t i = 0; i < parts.size(); ++i)
{
const MergeTreeData::DataPartPtr & part = parts[i];
MarkRanges ranges = markRangesFromPKRange(part->index, key_condition, settings);
MarkRanges ranges = markRangesFromPKRange(part, key_condition, settings);
/** In order to get a lower bound on the number of rows that match the condition on PK,
* consider only guaranteed full marks.
@ -109,10 +109,11 @@ size_t MergeTreeDataSelectExecutor::getApproximateTotalRowsToRead(
*/
for (size_t j = 0; j < ranges.size(); ++j)
if (ranges[j].end - ranges[j].begin > 2)
full_marks_count += ranges[j].end - ranges[j].begin - 2;
rows_count += part->index_granularity.getRowsCountInRange({ranges[j].begin + 1, ranges[j].end - 1});
}
return full_marks_count * data.index_granularity;
return rows_count;
}
@ -159,9 +160,6 @@ BlockInputStreams MergeTreeDataSelectExecutor::readFromParts(
const PartitionIdToMaxBlock * max_block_numbers_to_read) const
{
std::cerr << "START READING FROM PARTS\n";
for (auto part : parts) {
std::cerr << "PartMarks:" << part->marks_file_extension << std::endl;
}
size_t part_index = 0;
/// If query contains restrictions on the virtual column `_part` or `_part_index`, select only parts suitable for it.
@ -537,9 +535,9 @@ BlockInputStreams MergeTreeDataSelectExecutor::readFromParts(
RangesInDataPart ranges(part, part_index++);
if (data.hasPrimaryKey())
ranges.ranges = markRangesFromPKRange(part->index, key_condition, settings);
ranges.ranges = markRangesFromPKRange(part, key_condition, settings);
else
ranges.ranges = MarkRanges{MarkRange{0, part->marks_count}};
ranges.ranges = MarkRanges{MarkRange{0, part->getMarksCount()}};
for (const auto & index_and_condition : useful_indices)
ranges.ranges = filterMarksUsingIndex(
@ -631,16 +629,17 @@ BlockInputStreams MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreams(
const Names & virt_columns,
const Settings & settings) const
{
size_t average_index_granularity = getAvgGranularityForAllPartsRanges(parts);
const size_t min_marks_for_concurrent_read =
(settings.merge_tree_min_rows_for_concurrent_read + data.index_granularity - 1) / data.index_granularity;
const size_t max_marks_to_use_cache =
(settings.merge_tree_max_rows_to_use_cache + data.index_granularity - 1) / data.index_granularity;
(settings.merge_tree_min_rows_for_concurrent_read + average_index_granularity - 1) / average_index_granularity;
/// Count marks for each part.
std::vector<size_t> sum_marks_in_parts(parts.size());
size_t sum_marks = 0;
size_t total_rows = 0;
for (size_t i = 0; i < parts.size(); ++i)
{
total_rows += parts[i].getRowsCount();
/// Let the ranges be listed from right to left so that the leftmost range can be dropped using `pop_back()`.
std::reverse(parts[i].ranges.begin(), parts[i].ranges.end());
@ -650,7 +649,7 @@ BlockInputStreams MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreams(
sum_marks += sum_marks_in_parts[i];
}
if (sum_marks > max_marks_to_use_cache)
if (total_rows > settings.merge_tree_max_rows_to_use_cache)
use_uncompressed_cache = false;
BlockInputStreams res;
@ -666,7 +665,6 @@ BlockInputStreams MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreams(
column_names, MergeTreeReadPool::BackoffSettings(settings), settings.preferred_block_size_bytes, false);
/// Let's estimate total number of rows for progress bar.
const size_t total_rows = data.index_granularity * sum_marks;
LOG_TRACE(log, "Reading approx. " << total_rows << " rows with " << num_streams << " streams");
for (size_t i = 0; i < num_streams; ++i)
@ -773,15 +771,11 @@ BlockInputStreams MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreamsFinal
const Names & virt_columns,
const Settings & settings) const
{
const size_t max_marks_to_use_cache =
(settings.merge_tree_max_rows_to_use_cache + data.index_granularity - 1) / data.index_granularity;
size_t sum_marks = 0;
size_t sum_rows = 0;
for (size_t i = 0; i < parts.size(); ++i)
for (size_t j = 0; j < parts[i].ranges.size(); ++j)
sum_marks += parts[i].ranges[j].end - parts[i].ranges[j].begin;
sum_rows += parts[i].getRowsCount();
if (sum_marks > max_marks_to_use_cache)
if (sum_rows > settings.merge_tree_max_rows_to_use_cache)
use_uncompressed_cache = false;
BlockInputStreams to_merge;
@ -874,11 +868,12 @@ void MergeTreeDataSelectExecutor::createPositiveSignCondition(
/// Calculates a set of mark ranges, that could possibly contain keys, required by condition.
/// In other words, it removes subranges from whole range, that definitely could not contain required keys.
MarkRanges MergeTreeDataSelectExecutor::markRangesFromPKRange(
const MergeTreeData::DataPart::Index & index, const KeyCondition & key_condition, const Settings & settings) const
const MergeTreeData::DataPartPtr & part, const KeyCondition & key_condition, const Settings & settings) const
{
MarkRanges res;
size_t marks_count = index.at(0)->size();
size_t marks_count = part->index_granularity.getMarksCount();
const auto & index = part->index;
if (marks_count == 0)
return res;
@ -890,7 +885,7 @@ MarkRanges MergeTreeDataSelectExecutor::markRangesFromPKRange(
else
{
size_t used_key_size = key_condition.getMaxKeyColumn() + 1;
size_t min_marks_for_seek = (settings.merge_tree_min_rows_for_seek + data.index_granularity - 1) / data.index_granularity;
size_t min_marks_for_seek = (settings.merge_tree_min_rows_for_seek + part->index_granularity.getAvgGranularity() - 1) / part->index_granularity.getAvgGranularity();
/** There will always be disjoint suspicious segments on the stack, the leftmost one at the top (back).
* At each step, take the left segment and check if it fits.
@ -972,13 +967,14 @@ MarkRanges MergeTreeDataSelectExecutor::filterMarksUsingIndex(
return ranges;
}
const size_t min_marks_for_seek = (settings.merge_tree_min_rows_for_seek + data.index_granularity - 1) / data.index_granularity;
const size_t avg_granularity =part->index_granularity.getAvgGranularity();
const size_t min_marks_for_seek = (settings.merge_tree_min_rows_for_seek + avg_granularity - 1) / avg_granularity;
size_t granules_dropped = 0;
MergeTreeIndexReader reader(
index, part,
((part->marks_count + index->granularity - 1) / index->granularity),
((part->getMarksCount() + index->granularity - 1) / index->granularity),
ranges);
MarkRanges res;

View File

@ -78,7 +78,7 @@ private:
const Context & context) const;
MarkRanges markRangesFromPKRange(
const MergeTreeData::DataPart::Index & index,
const MergeTreeData::DataPartPtr & part,
const KeyCondition & key_condition,
const Settings & settings) const;

View File

@ -10,7 +10,7 @@ MergeTreeIndexReader::MergeTreeIndexReader(
part->getFullPath() + index->getFileName(), ".idx", marks_count,
all_mark_ranges, nullptr, false, nullptr,
part->getFileSizeOrZero(index->getFileName() + ".idx"), 0, DBMS_DEFAULT_BUFFER_SIZE,
part->marks_file_extension, part->mark_size_in_bytes,
part->storage.index_granularity_info.marks_file_extension, part->storage.index_granularity_info.mark_size_in_bytes,
ReadBufferFromFileBase::ProfileCallback{}, CLOCK_MONOTONIC_COARSE)
{
stream.seekToStart();

View File

@ -24,7 +24,7 @@ size_t MergeTreeRangeReader::DelayedStream::position() const
{
size_t num_rows_before_current_mark = 0;
for (size_t i = 0; i < current_mark; ++i)
num_rows_before_current_mark += merge_tree_reader->data_part->marks_index_granularity[i];
num_rows_before_current_mark += merge_tree_reader->data_part->index_granularity.getMarkRows(i);
return num_rows_before_current_mark + current_offset + num_delayed_rows;
}
@ -55,7 +55,7 @@ size_t MergeTreeRangeReader::DelayedStream::read(Block & block, size_t from_mark
{
size_t num_rows_before_from_mark = 0;
for (size_t i = 0; i < from_mark; ++i)
num_rows_before_from_mark += merge_tree_reader->data_part->marks_index_granularity[i];
num_rows_before_from_mark += merge_tree_reader->data_part->index_granularity.getMarkRows(i);
/// We already stand accurately in required position,
/// so because stream is lazy, we don't read anything
@ -83,9 +83,9 @@ size_t MergeTreeRangeReader::DelayedStream::finalize(Block & block)
/// We need to skip some rows before reading
if (current_offset && !continue_reading)
{
for (size_t mark_num : ext::range(current_mark, merge_tree_reader->data_part->marks_count))
for (size_t mark_num : ext::range(current_mark, merge_tree_reader->data_part->getMarksCount()))
{
size_t mark_index_granularity = merge_tree_reader->data_part->marks_index_granularity[mark_num];
size_t mark_index_granularity = merge_tree_reader->data_part->index_granularity.getMarkRows(mark_num);
if (current_offset >= mark_index_granularity)
{
current_offset -= mark_index_granularity;
@ -119,16 +119,17 @@ MergeTreeRangeReader::Stream::Stream(
: current_mark(from_mark), offset_after_current_mark(0)
, last_mark(to_mark)
, merge_tree_reader(merge_tree_reader)
, current_mark_index_granularity(merge_tree_reader->data_part->marks_index_granularity[from_mark])
, current_mark_index_granularity(merge_tree_reader->data_part->index_granularity.getMarkRows(from_mark))
, stream(from_mark, merge_tree_reader)
{
if (from_mark >= merge_tree_reader->data_part->marks_index_granularity.size())
size_t marks_count = merge_tree_reader->data_part->index_granularity.getMarksCount();
if (from_mark >= marks_count)
throw Exception("Trying create stream to read from mark №"+ toString(current_mark) + " but total marks count is "
+ toString(merge_tree_reader->data_part->marks_index_granularity.size()), ErrorCodes::LOGICAL_ERROR);
+ toString(marks_count), ErrorCodes::LOGICAL_ERROR);
if (last_mark > merge_tree_reader->data_part->marks_index_granularity.size())
if (last_mark > marks_count)
throw Exception("Trying create stream to read to mark №"+ toString(current_mark) + " but total marks count is "
+ toString(merge_tree_reader->data_part->marks_index_granularity.size()), ErrorCodes::LOGICAL_ERROR);
+ toString(marks_count), ErrorCodes::LOGICAL_ERROR);
}
void MergeTreeRangeReader::Stream::checkNotFinished() const
@ -157,10 +158,10 @@ void MergeTreeRangeReader::Stream::toNextMark()
{
++current_mark;
size_t total_marks_count = merge_tree_reader->data_part->marks_index_granularity.size();
size_t total_marks_count = merge_tree_reader->data_part->index_granularity.getMarksCount();
/// TODO(alesap) clumsy logic, fixme
if (current_mark < total_marks_count)
current_mark_index_granularity = merge_tree_reader->data_part->marks_index_granularity[current_mark];
current_mark_index_granularity = merge_tree_reader->data_part->index_granularity.getMarkRows(current_mark);
else
current_mark_index_granularity = 0; /// HACK?
///throw Exception("Trying to read from mark " + toString(current_mark) + ", but total marks count " + toString(total_marks_count), ErrorCodes::LOGICAL_ERROR);
@ -446,7 +447,7 @@ size_t MergeTreeRangeReader::numPendingRowsInCurrentGranule() const
size_t MergeTreeRangeReader::Stream::numPendingRows() const {
size_t rows_between_marks = 0;
for(size_t mark = current_mark; mark < last_mark; ++mark)
rows_between_marks += merge_tree_reader->data_part->marks_index_granularity[mark];
rows_between_marks += merge_tree_reader->data_part->index_granularity.getMarkRows(mark);
return rows_between_marks - offset_after_current_mark;
}

View File

@ -36,9 +36,15 @@ MergeTreeReadPool::MergeTreeReadPool(
/// parts don't contain duplicate MergeTreeDataPart's.
const auto per_part_sum_marks = fillPerPartInfo(parts, check_columns);
fillPerThreadInfo(threads, sum_marks, per_part_sum_marks, parts, min_marks_for_concurrent_read);
avg_parts_granularity = getAvgGranularityForAllPartsRanges(parts);
}
size_t MergeTreeReadPool::getAvgGranularityForReadingParts() const
{
return avg_parts_granularity;
}
MergeTreeReadTaskPtr MergeTreeReadPool::getTask(const size_t min_marks_to_read, const size_t thread, const Names & ordered_names)
{
const std::lock_guard lock{mutex};

View File

@ -83,6 +83,8 @@ public:
/// This method tells which mark ranges we have to read if we start from @from mark range
MarkRanges getRestMarks(const std::string & part_path, const MarkRange & from) const;
size_t getAvgGranularityForReadingParts() const;
Block getHeader() const;
private:
@ -104,6 +106,7 @@ private:
std::vector<char> per_part_should_reorder;
std::vector<MergeTreeBlockSizePredictorPtr> per_part_size_predictor;
PrewhereInfoPtr prewhere_info;
size_t avg_parts_granularity;
struct Part
{

View File

@ -173,11 +173,11 @@ void MergeTreeReader::addStreams(const String & name, const IDataType & type,
return;
streams.emplace(stream_name, std::make_unique<MergeTreeReaderStream>(
path + stream_name, DATA_FILE_EXTENSION, data_part->marks_count,
path + stream_name, DATA_FILE_EXTENSION, data_part->getMarksCount(),
all_mark_ranges, mark_cache, save_marks_in_cache,
uncompressed_cache, data_part->getFileSizeOrZero(stream_name + DATA_FILE_EXTENSION),
aio_threshold, max_read_buffer_size,
data_part->marks_file_extension, data_part->mark_size_in_bytes,
storage.index_granularity_info.marks_file_extension, storage.index_granularity_info.mark_size_in_bytes,
profile_callback, clock_type));
};

View File

@ -46,15 +46,15 @@ MergeTreeSelectBlockInputStream::MergeTreeSelectBlockInputStream(
for (const auto & range : all_mark_ranges)
total_marks_count += range.end - range.begin;
size_t total_rows = total_marks_count * storage.index_granularity;
size_t total_rows = data_part->index_granularity.getTotalRows();
if (!quiet)
LOG_TRACE(log, "Reading " << all_mark_ranges.size() << " ranges from part " << data_part->name
<< ", approx. " << total_rows
<< (all_mark_ranges.size() > 1
? ", up to " + toString((all_mark_ranges.back().end - all_mark_ranges.front().begin) * storage.index_granularity)
? ", up to " + toString(data_part->index_granularity.getRowsCountInRanges(all_mark_ranges))
: "")
<< " rows starting from " << all_mark_ranges.front().begin * storage.index_granularity);
<< " rows starting from " << data_part->index_granularity.getMarkStartingRow(all_mark_ranges.front().begin));
addTotalRowsApprox(total_rows);

View File

@ -25,7 +25,7 @@ MergeTreeSequentialBlockInputStream::MergeTreeSequentialBlockInputStream(
if (!quiet)
{
std::stringstream message;
message << "Reading " << data_part->marks_count << " marks from part " << data_part->name
message << "Reading " << data_part->getMarksCount() << " marks from part " << data_part->name
<< ", total " << data_part->rows_count
<< " rows starting from the beginning of the part, columns: ";
for (size_t i = 0, size = columns_to_read.size(); i < size; ++i)
@ -56,7 +56,7 @@ MergeTreeSequentialBlockInputStream::MergeTreeSequentialBlockInputStream(
reader = std::make_unique<MergeTreeReader>(
data_part->getFullPath(), data_part, columns_for_reader, /* uncompressed_cache = */ nullptr,
mark_cache.get(), /* save_marks_in_cache = */ false, storage,
MarkRanges{MarkRange(0, data_part->marks_count)},
MarkRanges{MarkRange(0, data_part->getMarksCount())},
/* bytes to use AIO (this is hack) */
read_with_direct_io ? 1UL : std::numeric_limits<size_t>::max(),
DBMS_DEFAULT_BUFFER_SIZE);
@ -91,7 +91,7 @@ try
Block res;
if (!isCancelled() && current_row < data_part->rows_count)
{
size_t rows_to_read = data_part->marks_index_granularity[current_mark];
size_t rows_to_read = data_part->index_granularity.getMarkRows(current_mark);
bool continue_reading = (current_mark != 0);
size_t rows_readed = reader->readRows(current_mark, continue_reading, rows_to_read, res);

View File

@ -29,8 +29,9 @@ MergeTreeThreadSelectBlockInputStream::MergeTreeThreadSelectBlockInputStream(
/// round min_marks_to_read up to nearest multiple of block_size expressed in marks
if (max_block_size_rows)
{
min_marks_to_read = (min_marks_to_read_ * storage.index_granularity + max_block_size_rows - 1)
/ max_block_size_rows * max_block_size_rows / storage.index_granularity;
size_t avg_granularity = pool->getAvgGranularityForReadingParts();
min_marks_to_read = (min_marks_to_read_ * avg_granularity + max_block_size_rows - 1)
/ max_block_size_rows * max_block_size_rows / avg_granularity;
}
else
min_marks_to_read = min_marks_to_read_;

View File

@ -21,10 +21,6 @@ namespace
{
constexpr auto DATA_FILE_EXTENSION = ".bin";
constexpr auto FIXED_MARKS_FILE_EXTENSION = ".mrk";
constexpr auto ADAPTIVE_MARKS_FILE_EXTENSION = ".mrk2";
constexpr auto FIXED_MARK_BYTE_SIZE = sizeof(MarkInCompressedFile);
constexpr auto ADAPTIVE_MARK_BYTE_SIZE = sizeof(MarkInCompressedFile) + sizeof(size_t);
constexpr auto INDEX_FILE_EXTENSION = ".idx";
}
@ -39,13 +35,13 @@ IMergedBlockOutputStream::IMergedBlockOutputStream(
CompressionCodecPtr codec_,
size_t aio_threshold_,
bool blocks_are_granules_size_,
const std::vector<size_t> & index_granularity_)
const IndexGranularity & index_granularity_)
: storage(storage_)
, min_compress_block_size(min_compress_block_size_)
, max_compress_block_size(max_compress_block_size_)
, aio_threshold(aio_threshold_)
, marks_file_extension(storage.index_granularity_bytes == 0 ? FIXED_MARKS_FILE_EXTENSION : ADAPTIVE_MARKS_FILE_EXTENSION)
, mark_size_in_bytes(storage.index_granularity_bytes == 0 ? FIXED_MARK_BYTE_SIZE : ADAPTIVE_MARK_BYTE_SIZE)
, marks_file_extension(storage.index_granularity_info.marks_file_extension)
, mark_size_in_bytes(storage.index_granularity_info.mark_size_in_bytes)
, blocks_are_granules_size(blocks_are_granules_size_)
, index_granularity(index_granularity_)
, compute_granularity(index_granularity.empty())
@ -112,15 +108,15 @@ IDataType::OutputStreamGetter IMergedBlockOutputStream::createStreamGetter(
void fillIndexGranularityImpl(
const Block & block,
size_t index_granularity_bytes,
size_t index_granularity_rows,
size_t fixed_index_granularity_rows,
bool blocks_are_granules,
size_t index_offset,
std::vector<size_t> & index_granularity)
IndexGranularity & index_granularity)
{
size_t rows_in_block = block.rows();
size_t index_granularity_for_block;
if (index_granularity_bytes == 0)
index_granularity_for_block = index_granularity_rows;
index_granularity_for_block = fixed_index_granularity_rows;
else
{
size_t block_size_in_memory = block.bytes();
@ -144,15 +140,15 @@ void fillIndexGranularityImpl(
//std::cerr << "GRANULARITY SIZE IN ROWS:"<< index_granularity_for_block << std::endl;
for (size_t current_row = index_offset; current_row < rows_in_block; current_row += index_granularity_for_block)
index_granularity.push_back(index_granularity_for_block);
index_granularity.appendMark(index_granularity_for_block);
}
void IMergedBlockOutputStream::fillIndexGranularity(const Block & block)
{
fillIndexGranularityImpl(
block,
storage.index_granularity_bytes,
storage.index_granularity,
storage.index_granularity_info.index_granularity_bytes,
storage.index_granularity_info.fixed_index_granularity,
blocks_are_granules_size,
index_offset,
index_granularity);
@ -193,7 +189,7 @@ size_t IMergedBlockOutputStream::writeSingleGranule(
writeIntBinary(stream.plain_hashing.count(), stream.marks);
writeIntBinary(stream.compressed.offset(), stream.marks);
if (stream.marks_file_extension != FIXED_MARKS_FILE_EXTENSION)
if (storage.index_granularity_info.is_adaptive)
writeIntBinary(number_of_rows, stream.marks);
}, serialize_settings.path);
}
@ -251,12 +247,12 @@ std::pair<size_t, size_t> IMergedBlockOutputStream::writeColumn(
}
else
{
if (index_granularity.size() <= current_column_mark)
if (index_granularity.getMarksCount() <= current_column_mark)
throw Exception(
"Incorrect size of index granularity expect mark " + toString(current_column_mark) + " totally have marks " + toString(index_granularity.size()),
"Incorrect size of index granularity expect mark " + toString(current_column_mark) + " totally have marks " + toString(index_granularity.getMarksCount()),
ErrorCodes::LOGICAL_ERROR);
rows_to_write = index_granularity[current_column_mark];
rows_to_write = index_granularity.getMarkRows(current_column_mark);
}
current_row = writeSingleGranule(
@ -349,14 +345,13 @@ MergedBlockOutputStream::MergedBlockOutputStream(
String part_path_,
const NamesAndTypesList & columns_list_,
CompressionCodecPtr default_codec_,
bool blocks_are_granules_size_,
const std::vector<size_t> & index_granularity_)
bool blocks_are_granules_size_)
: IMergedBlockOutputStream(
storage_, storage_.global_context.getSettings().min_compress_block_size,
storage_.global_context.getSettings().max_compress_block_size, default_codec_,
storage_.global_context.getSettings().min_bytes_to_use_direct_io,
blocks_are_granules_size_,
index_granularity_),
{}),
columns_list(columns_list_), part_path(part_path_)
{
init();
@ -374,12 +369,11 @@ MergedBlockOutputStream::MergedBlockOutputStream(
CompressionCodecPtr default_codec_,
const MergeTreeData::DataPart::ColumnToSize & merged_column_to_size_,
size_t aio_threshold_,
bool blocks_are_granules_size_,
const std::vector<size_t> & index_granularity_)
bool blocks_are_granules_size_)
: IMergedBlockOutputStream(
storage_, storage_.global_context.getSettings().min_compress_block_size,
storage_.global_context.getSettings().max_compress_block_size, default_codec_,
aio_threshold_, blocks_are_granules_size_, index_granularity_),
aio_threshold_, blocks_are_granules_size_, {}),
columns_list(columns_list_), part_path(part_path_)
{
init();
@ -523,15 +517,12 @@ void MergedBlockOutputStream::writeSuffixAndFinalizePart(
new_part->rows_count = rows_count;
//std::cerr << "SETTING CURRENT MARK FOR PART:" << part_path << " to " << current_mark << std::endl;
new_part->marks_count = current_mark;
new_part->modification_time = time(nullptr);
new_part->columns = *total_column_list;
new_part->index.assign(std::make_move_iterator(index_columns.begin()), std::make_move_iterator(index_columns.end()));
new_part->checksums = checksums;
new_part->bytes_on_disk = checksums.getTotalSizeOnDisk();
new_part->marks_file_extension = marks_file_extension;
new_part->marks_index_granularity.swap(index_granularity);
new_part->mark_size_in_bytes = mark_size_in_bytes;
new_part->index_granularity = index_granularity;
}
void MergedBlockOutputStream::init()
@ -684,6 +675,7 @@ void MergedBlockOutputStream::writeImpl(const Block & block, const IColumn::Perm
auto & stream = *skip_indices_streams[i];
size_t prev_pos = 0;
size_t current_mark = 0;
while (prev_pos < rows)
{
UInt64 limit = 0;
@ -693,7 +685,7 @@ void MergedBlockOutputStream::writeImpl(const Block & block, const IColumn::Perm
}
else
{
limit = storage.index_granularity;
limit = index_granularity.getMarkRows(current_mark); /// TODO(alesap)
if (skip_indices_aggregators[i]->empty())
{
skip_indices_aggregators[i] = index->createIndexAggregator();
@ -722,6 +714,7 @@ void MergedBlockOutputStream::writeImpl(const Block & block, const IColumn::Perm
}
}
prev_pos = pos;
current_mark++;
}
}
}
@ -753,8 +746,8 @@ void MergedBlockOutputStream::writeImpl(const Block & block, const IColumn::Perm
//std::cerr << "I:" << i << " Total rows:" << rows << std::endl;
//std::cerr << "Increment current mark:" << current_mark << std::endl;
++current_mark;
if (current_mark < index_granularity.size())
i += index_granularity[current_mark];
if (current_mark < index_granularity.getMarksCount())
i += index_granularity.getMarkRows(current_mark);
else
break;
}
@ -772,7 +765,7 @@ MergedColumnOnlyOutputStream::MergedColumnOnlyOutputStream(
MergeTreeData & storage_, const Block & header_, String part_path_, bool sync_,
CompressionCodecPtr default_codec_, bool skip_offsets_,
WrittenOffsetColumns & already_written_offset_columns,
const std::vector<size_t> & index_granularity_)
const IndexGranularity & index_granularity_)
: IMergedBlockOutputStream(
storage_, storage_.global_context.getSettings().min_compress_block_size,
storage_.global_context.getSettings().max_compress_block_size, default_codec_,

View File

@ -1,5 +1,6 @@
#pragma once
#include <Storages/MergeTree/IndexGranularity.h>
#include <IO/WriteBufferFromFile.h>
#include <Compression/CompressedWriteBuffer.h>
#include <IO/HashingWriteBuffer.h>
@ -23,7 +24,7 @@ public:
CompressionCodecPtr default_codec_,
size_t aio_threshold_,
bool blocks_are_granules_size_,
const std::vector<size_t> & index_granularity_);
const IndexGranularity & index_granularity_);
using WrittenOffsetColumns = std::set<std::string>;
@ -118,7 +119,7 @@ protected:
const size_t mark_size_in_bytes;
const bool blocks_are_granules_size;
std::vector<size_t> index_granularity;
IndexGranularity index_granularity;
const bool compute_granularity;
CompressionCodecPtr codec;
@ -136,8 +137,7 @@ public:
String part_path_,
const NamesAndTypesList & columns_list_,
CompressionCodecPtr default_codec_,
bool blocks_are_granules_size_ = false,
const std::vector<size_t> & index_granularity_ = {});
bool blocks_are_granules_size_ = false);
MergedBlockOutputStream(
MergeTreeData & storage_,
@ -146,8 +146,7 @@ public:
CompressionCodecPtr default_codec_,
const MergeTreeData::DataPart::ColumnToSize & merged_column_to_size_,
size_t aio_threshold_,
bool blocks_are_granules_size_ = false,
const std::vector<size_t> & index_granularity_ = {});
bool blocks_are_granules_size_ = false);
std::string getPartPath() const;
@ -169,7 +168,7 @@ public:
const NamesAndTypesList * total_columns_list = nullptr,
MergeTreeData::DataPart::Checksums * additional_column_checksums = nullptr);
const std::vector<size_t> & getIndexGranularity() const
const IndexGranularity & getIndexGranularity() const
{
return index_granularity;
}
@ -210,7 +209,7 @@ public:
MergeTreeData & storage_, const Block & header_, String part_path_, bool sync_,
CompressionCodecPtr default_codec_, bool skip_offsets_,
WrittenOffsetColumns & already_written_offset_columns,
const std::vector<size_t> & index_granularity_);
const IndexGranularity & index_granularity_);
Block getHeader() const override { return header; }
void write(const Block & block) override;

View File

@ -21,9 +21,35 @@ struct RangesInDataPart
: data_part{data_part}, part_index_in_query{part_index_in_query}, ranges{ranges}
{
}
size_t getAvgGranularityForRanges() const
{
size_t total_rows = 0;
size_t total_marks = 0;
for (const auto & range : ranges)
{
total_rows += data_part->index_granularity.getRowsCountInRange(range);
total_marks += (range.end - range.begin);
}
return total_rows / total_marks;
}
size_t getRowsCount() const
{
return data_part->index_granularity.getRowsCountInRanges(ranges);
}
};
using RangesInDataParts = std::vector<RangesInDataPart>;
inline size_t getAvgGranularityForAllPartsRanges(const RangesInDataParts & parts_ranges)
{
size_t sum_of_averages = 0;
for (const auto & part : parts_ranges)
sum_of_averages += part.getAvgGranularityForRanges();
return sum_of_averages / parts_ranges.size();
}
}

View File

@ -28,7 +28,7 @@ ReplicatedMergeTreeTableMetadata::ReplicatedMergeTreeTableMetadata(const MergeTr
date_column = data.minmax_idx_columns[data.minmax_idx_date_column_pos];
sampling_expression = formattedAST(data.sample_by_ast);
index_granularity = data.index_granularity;
index_granularity = data.index_granularity_info.fixed_index_granularity;
merging_params_mode = static_cast<int>(data.merging_params.mode);
sign_column = data.merging_params.sign_column;

View File

@ -4,6 +4,7 @@
#include <Poco/File.h>
#include <Poco/DirectoryIterator.h>
#include <Storages/MergeTree/IndexGranularity.h>
#include <Storages/MergeTree/checkDataPart.h>
#include <DataStreams/MarkInCompressedFile.h>
#include <Compression/CompressedReadBuffer.h>
@ -42,7 +43,7 @@ public:
String mrk_file_path;
private:
String marks_file_extension;
const std::vector<size_t> & index_granularity;
const IndexGranularity & index_granularity;
ReadBufferFromFile file_buf;
HashingReadBuffer compressed_hashing_buf;
CompressedReadBuffer uncompressing_buf;
@ -55,7 +56,7 @@ private:
public:
HashingReadBuffer mrk_hashing_buf;
Stream(const String & path, const String & base_name, const String & bin_file_extension_, const String & mrk_file_extension_, const std::vector<size_t> & index_granularity_)
Stream(const String & path, const String & base_name, const String & bin_file_extension_, const String & mrk_file_extension_, const IndexGranularity & index_granularity_)
:
base_name(base_name),
bin_file_path(path + base_name + bin_file_extension_),
@ -81,7 +82,7 @@ public:
if (marks_file_extension == ".mrk2")
readIntBinary(mrk_rows, mrk_hashing_buf);
else
mrk_rows = index_granularity[mark_position];
mrk_rows = index_granularity.getMarkRows(mark_position);
//std::cerr << "MRK ROWS:" << mrk_rows << std::endl;
bool has_alternative_mark = false;
@ -117,8 +118,8 @@ public:
data_mark.offset_in_compressed_file = compressed_hashing_buf.count() - uncompressing_buf.getSizeCompressed();
data_mark.offset_in_decompressed_block = uncompressed_hashing_buf.offset();
if (mrk_mark != data_mark || mrk_rows != index_granularity[mark_position])
throw Exception("Incorrect mark: " + data_mark.toStringWithRows(index_granularity[mark_position]) +
if (mrk_mark != data_mark || mrk_rows != index_granularity.getMarkRows(mark_position))
throw Exception("Incorrect mark: " + data_mark.toStringWithRows(index_granularity.getMarkRows(mark_position)) +
(has_alternative_mark ? " or " + alternative_data_mark.toString() : "") + " in data, " +
mrk_mark.toStringWithRows(mrk_rows) + " in " + mrk_file_path + " file", ErrorCodes::INCORRECT_MARK);
@ -157,7 +158,7 @@ public:
MergeTreeData::DataPart::Checksums checkDataPart(
const String & full_path,
const std::vector<size_t> adaptive_index_granularity,
const IndexGranularity & adaptive_index_granularity,
const size_t fixed_granularity,
const String marks_file_extension,
bool require_checksums,
@ -344,7 +345,7 @@ MergeTreeData::DataPart::Checksums checkDataPart(
}
}, settings.path);
size_t rows_after_mark = adaptive_index_granularity[mark_num];
size_t rows_after_mark = adaptive_index_granularity.getMarkRows(mark_num);
std::cerr << "rows after mark:" << rows_after_mark << std::endl;
std::cerr << "mark_num:" << mark_num << std::endl;
++mark_num;
@ -369,7 +370,7 @@ MergeTreeData::DataPart::Checksums checkDataPart(
size_t read_size = tmp_column->size();
column_size += read_size;
if (read_size < rows_after_mark || mark_num == adaptive_index_granularity.size())
if (read_size < rows_after_mark || mark_num == adaptive_index_granularity.getMarksCount())
break;
else if (marks_eof)
throw Exception("Unexpected end of mrk file while reading column " + name_type.name, ErrorCodes::CORRUPTED_DATA);
@ -407,18 +408,11 @@ MergeTreeData::DataPart::Checksums checkDataPart(
if (!primary_key_data_types.empty())
{
size_t expected_marks = adaptive_index_granularity.size();
size_t expected_marks = adaptive_index_granularity.getMarksCount();
if (expected_marks != marks_in_primary_key)
{
String info_about_granularity;
if (std::all_of(adaptive_index_granularity.begin(), adaptive_index_granularity.end(), [&fixed_granularity](size_t i) { return i == fixed_granularity; } ))
info_about_granularity = "fixed=" + toString(fixed_granularity);
else
info_about_granularity = "adaptive";
throw Exception("Size of primary key doesn't match expected number of marks."
" Number of rows in columns: " + toString(*rows)
+ ", index_granularity: " + info_about_granularity
+ ", expected number of marks: " + toString(expected_marks)
+ ", size of primary key: " + toString(marks_in_primary_key),
ErrorCodes::CORRUPTED_DATA);
@ -449,9 +443,9 @@ MergeTreeData::DataPart::Checksums checkDataPart(
{
return checkDataPart(
data_part->getFullPath(),
data_part->marks_index_granularity,
data_part->storage.index_granularity,
data_part->marks_file_extension,
data_part->index_granularity,
data_part->storage.index_granularity_info.fixed_index_granularity,
data_part->storage.index_granularity_info.marks_file_extension,
require_checksums,
primary_key_data_types,
indices,

View File

@ -21,7 +21,7 @@ MergeTreeData::DataPart::Checksums checkDataPart(
MergeTreeData::DataPart::Checksums checkDataPart(
const String & full_path,
const std::vector<size_t> adaptive_index_granularity,
const IndexGranularity & index_granularity,
const size_t fixed_granularity,
const String marks_file_extension,
bool require_checksums,

View File

@ -68,7 +68,7 @@ void StorageSystemParts::processNextStorage(MutableColumns & columns, const Stor
}
columns[i++]->insert(part->name);
columns[i++]->insert(part_state == State::Committed);
columns[i++]->insert(part->marks_count);
columns[i++]->insert(part->getMarksCount());
columns[i++]->insert(part->rows_count);
columns[i++]->insert(part->bytes_on_disk.load(std::memory_order_relaxed));
columns[i++]->insert(columns_size.data_compressed);

View File

@ -106,7 +106,7 @@ void StorageSystemPartsColumns::processNextStorage(MutableColumns & columns, con
}
columns[j++]->insert(part->name);
columns[j++]->insert(part_state == State::Committed);
columns[j++]->insert(part->marks_count);
columns[j++]->insert(part->getMarksCount());
columns[j++]->insert(part->rows_count);
columns[j++]->insert(part->bytes_on_disk.load(std::memory_order_relaxed));

View File

@ -28,44 +28,44 @@ TEST(AdaptiveIndexGranularity, FillGranularityToyTests)
auto block1 = getBlockWithSize(80, 8);
EXPECT_EQ(block1.bytes(), 80);
{ /// Granularity bytes are not set. Take default index_granularity.
std::vector<size_t> index_granularity;
IndexGranularity index_granularity;
fillIndexGranularityImpl(block1, 0, 100, false, 0, index_granularity);
EXPECT_EQ(index_granularity.size(), 1);
EXPECT_EQ(index_granularity[0], 100);
EXPECT_EQ(index_granularity.getMarksCount(), 1);
EXPECT_EQ(index_granularity.getMarkRows(0), 100);
}
{ /// Granule size is less than block size. Block contains multiple granules.
std::vector<size_t> index_granularity;
IndexGranularity index_granularity;
fillIndexGranularityImpl(block1, 16, 100, false, 0, index_granularity);
EXPECT_EQ(index_granularity.size(), 5); /// First granule with 8 rows, and second with 1 row
for (auto granule_size : index_granularity)
EXPECT_EQ(granule_size, 2);
EXPECT_EQ(index_granularity.getMarksCount(), 5); /// First granule with 8 rows, and second with 1 row
for (size_t i = 0; i < index_granularity.getMarksCount(); ++i)
EXPECT_EQ(index_granularity.getMarkRows(i), 2);
}
{ /// Granule size is more than block size. Whole block (and maybe more) can be placed in single granule.
std::vector<size_t> index_granularity;
IndexGranularity index_granularity;
fillIndexGranularityImpl(block1, 512, 100, false, 0, index_granularity);
EXPECT_EQ(index_granularity.size(), 1);
for (auto granule_size : index_granularity)
EXPECT_EQ(granule_size, 64);
EXPECT_EQ(index_granularity.getMarksCount(), 1);
for (size_t i = 0; i < index_granularity.getMarksCount(); ++i)
EXPECT_EQ(index_granularity.getMarkRows(i), 64);
}
{ /// Blocks with granule size
std::vector<size_t> index_granularity;
IndexGranularity index_granularity;
fillIndexGranularityImpl(block1, 1, 1, true, 0, index_granularity);
EXPECT_EQ(index_granularity.size(), 1);
for (auto granule_size : index_granularity)
EXPECT_EQ(granule_size, block1.rows());
EXPECT_EQ(index_granularity.getMarksCount(), 1);
for (size_t i = 0; i < index_granularity.getMarksCount(); ++i)
EXPECT_EQ(index_granularity.getMarkRows(i), block1.rows());
}
{ /// Shift in index offset
std::vector<size_t> index_granularity;
IndexGranularity index_granularity;
fillIndexGranularityImpl(block1, 16, 100, false, 6, index_granularity);
EXPECT_EQ(index_granularity.size(), 2);
for (auto granule_size : index_granularity)
EXPECT_EQ(granule_size, 2);
EXPECT_EQ(index_granularity.getMarksCount(), 2);
for (size_t i = 0; i < index_granularity.getMarksCount(); ++i)
EXPECT_EQ(index_granularity.getMarkRows(i), 2);
}
}
@ -76,26 +76,26 @@ TEST(AdaptiveIndexGranularity, FillGranularitySequenceOfBlocks)
auto block1 = getBlockWithSize(65536, 8);
auto block2 = getBlockWithSize(65536, 8);
auto block3 = getBlockWithSize(65536, 8);
std::vector<size_t> index_granularity;
IndexGranularity index_granularity;
for (const auto & block : {block1, block2, block3})
fillIndexGranularityImpl(block, 1024, 0, false, 0, index_granularity);
EXPECT_EQ(index_granularity.size(), 192); /// granules
for (auto granule_size_in_rows : index_granularity)
EXPECT_EQ(granule_size_in_rows, 128);
EXPECT_EQ(index_granularity.getMarksCount(), 192); /// granules
for (size_t i = 0; i < index_granularity.getMarksCount(); ++i)
EXPECT_EQ(index_granularity.getMarkRows(i), 128);
}
{ /// Three blocks of different size
auto block1 = getBlockWithSize(65536, 32);
auto block2 = getBlockWithSize(32768, 32);
auto block3 = getBlockWithSize(2048, 32);
EXPECT_EQ(block1.rows() + block2.rows() + block3.rows(), 3136);
std::vector<size_t> index_granularity;
IndexGranularity index_granularity;
for (const auto & block : {block1, block2, block3})
fillIndexGranularityImpl(block, 1024, 0, false, 0, index_granularity);
EXPECT_EQ(index_granularity.size(), 98); /// granules
for (auto granule_size_in_rows : index_granularity)
EXPECT_EQ(granule_size_in_rows, 32);
EXPECT_EQ(index_granularity.getMarksCount(), 98); /// granules
for (size_t i = 0; i < index_granularity.getMarksCount(); ++i)
EXPECT_EQ(index_granularity.getMarkRows(i), 32);
}
{ /// Three small blocks
@ -105,16 +105,16 @@ TEST(AdaptiveIndexGranularity, FillGranularitySequenceOfBlocks)
EXPECT_EQ(block1.rows() + block2.rows() + block3.rows(), (2048 + 4096 + 8192) / 32);
std::vector<size_t> index_granularity;
IndexGranularity index_granularity;
size_t index_offset = 0;
for (const auto & block : {block1, block2, block3})
{
fillIndexGranularityImpl(block, 16384, 0, false, index_offset, index_granularity);
index_offset = index_granularity.back() - block.rows();
index_offset = index_granularity.getLastMarkRows() - block.rows();
}
EXPECT_EQ(index_granularity.size(), 1); /// granules
for (auto granule_size_in_rows : index_granularity)
EXPECT_EQ(granule_size_in_rows, 512);
EXPECT_EQ(index_granularity.getMarksCount(), 1); /// granules
for (size_t i = 0; i < index_granularity.getMarksCount(); ++i)
EXPECT_EQ(index_granularity.getMarkRows(i), 512);
}
}

View File

@ -1,6 +1,7 @@
#include <Poco/ConsoleChannel.h>
#include <Poco/DirectoryIterator.h>
#include <Storages/MergeTree/checkDataPart.h>
#include <Storages/MergeTree/IndexGranularity.h>
#include <Common/Exception.h>
using namespace DB;
@ -20,9 +21,10 @@ Poco::Path getMarksFile(const std::string & part_path)
throw Exception("Cannot find any mark file in directory " + part_path, DB::ErrorCodes::METRIKA_OTHER_ERROR);
}
std::vector<size_t> readGranularity(const Poco::Path & mrk_file_path, size_t fixed_granularity)
IndexGranularity readGranularity(const Poco::Path & mrk_file_path, size_t fixed_granularity)
{
std::vector<size_t> result;
IndexGranularity result;
auto extension = mrk_file_path.getExtension();
DB::ReadBufferFromFile mrk_in(mrk_file_path.toString());
@ -41,7 +43,7 @@ std::vector<size_t> readGranularity(const Poco::Path & mrk_file_path, size_t fix
}
else
index_granularity_rows = fixed_granularity;
result.push_back(index_granularity_rows);
result.appendMark(index_granularity_rows);
}
return result;
}
@ -66,10 +68,10 @@ int main(int argc, char ** argv)
auto mrk_file_path = getMarksFile(full_path);
size_t fixed_granularity{parse<size_t>(argv[3])};
auto adaptive_granularity = readGranularity(mrk_file_path, fixed_granularity);
std::cerr << "adaptive size:" << adaptive_granularity.size() << std::endl;
for (size_t i : adaptive_granularity)
std::cerr << "adaptive size:" << adaptive_granularity.getMarksCount() << std::endl;
for (size_t i = 0; i < adaptive_granularity.getMarksCount(); ++i)
{
std::cerr << "Granularity:" << i << std::endl;
std::cerr << "Granularity:" << adaptive_granularity.getMarkRows(i) << std::endl;
}
auto marks_file_extension = "." + mrk_file_path.getExtension();
bool require_checksums = parse<bool>(argv[2]);