mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-28 02:21:59 +00:00
polymorphic parts (development) cleanup
This commit is contained in:
parent
6f673407ff
commit
258e8d61fb
@ -184,8 +184,8 @@ std::optional<size_t> IMergeTreeDataPart::getColumnPosition(const String & colum
|
||||
{
|
||||
if (!sample_block.has(column_name))
|
||||
return {};
|
||||
return sample_block.getPositionByName(column_name);
|
||||
}
|
||||
return sample_block.getPositionByName(column_name);
|
||||
}
|
||||
|
||||
DayNum IMergeTreeDataPart::getMinDate() const
|
||||
{
|
||||
@ -583,7 +583,7 @@ void IMergeTreeDataPart::loadColumns(bool require)
|
||||
{
|
||||
is_frozen = !poco_file_path.canWrite();
|
||||
ReadBufferFromFile file = openForReading(path);
|
||||
columns.readText(file);
|
||||
columns.readText(file);
|
||||
}
|
||||
|
||||
index_granularity_info.initialize(storage, getType(), columns.size());
|
||||
@ -597,7 +597,7 @@ void IMergeTreeDataPart::loadColumnSizes()
|
||||
|
||||
if (columns_num == 0)
|
||||
throw Exception("No columns in part " + name, ErrorCodes::NO_FILE_IN_DATA_PART);
|
||||
|
||||
|
||||
auto column_sizes_path = getFullPath() + "columns_sizes.txt";
|
||||
auto columns_sizes_file = Poco::File(column_sizes_path);
|
||||
if (!columns_sizes_file.exists())
|
||||
@ -772,7 +772,7 @@ void IMergeTreeDataPart::checkConsistency(bool /* require_part_metadata */) cons
|
||||
|
||||
String IMergeTreeDataPart::typeToString(Type type)
|
||||
{
|
||||
switch(type)
|
||||
switch (type)
|
||||
{
|
||||
case Type::WIDE:
|
||||
return "Wide";
|
||||
|
@ -64,7 +64,7 @@ public:
|
||||
const CompressionCodecPtr & default_codec_,
|
||||
const MergeTreeWriterSettings & writer_settings,
|
||||
const MergeTreeIndexGranularity & computed_index_granularity = {}) const = 0;
|
||||
|
||||
|
||||
virtual bool isStoredOnDisk() const = 0;
|
||||
|
||||
|
||||
@ -168,7 +168,7 @@ public:
|
||||
std::atomic<UInt64> bytes_on_disk {0}; /// 0 - if not counted;
|
||||
/// Is used from several threads without locks (it is changed with ALTER).
|
||||
/// May not contain size of checksums.txt and columns.txt
|
||||
|
||||
|
||||
time_t modification_time = 0;
|
||||
/// When the part is removed from the working set. Changes once.
|
||||
mutable std::atomic<time_t> remove_time { std::numeric_limits<time_t>::max() };
|
||||
|
@ -61,7 +61,7 @@ IMergeTreeDataPartWriter::IMergeTreeDataPartWriter(
|
||||
const String & part_path_,
|
||||
const MergeTreeData & storage_,
|
||||
const NamesAndTypesList & columns_list_,
|
||||
const std::vector<MergeTreeIndexPtr> & indices_to_recalc_,
|
||||
const std::vector<MergeTreeIndexPtr> & indices_to_recalc_,
|
||||
const String & marks_file_extension_,
|
||||
const CompressionCodecPtr & default_codec_,
|
||||
const MergeTreeWriterSettings & settings_,
|
||||
@ -81,7 +81,7 @@ IMergeTreeDataPartWriter::IMergeTreeDataPartWriter(
|
||||
{
|
||||
if (settings.blocks_are_granules_size && !index_granularity.empty())
|
||||
throw Exception("Can't take information about index granularity from blocks, when non empty index_granularity array specified", ErrorCodes::LOGICAL_ERROR);
|
||||
|
||||
|
||||
Poco::File part_dir(part_path);
|
||||
if (!part_dir.exists())
|
||||
part_dir.createDirectories();
|
||||
@ -126,12 +126,12 @@ void fillIndexGranularityImpl(
|
||||
|
||||
/// We should be less or equal than fixed index granularity
|
||||
index_granularity_for_block = std::min(fixed_index_granularity_rows, index_granularity_for_block);
|
||||
|
||||
|
||||
size_t current_row;
|
||||
for (current_row = index_offset; current_row < rows_in_block; current_row += index_granularity_for_block)
|
||||
{
|
||||
size_t rows_left_in_block = rows_in_block - current_row;
|
||||
|
||||
|
||||
// if (need_finish_last_granule && rows_left_in_block < index_granularity_for_block)
|
||||
// {
|
||||
// /// If enough rows are left, create a new granule. Otherwise, extend previous granule.
|
||||
@ -342,7 +342,7 @@ void IMergeTreeDataPartWriter::finishPrimaryIndexSerialization(MergeTreeData::Da
|
||||
}
|
||||
|
||||
|
||||
std::cerr << "(finishPrimaryIndexSerialization) marks_count: " << index_granularity.getMarksCount() << "\n";
|
||||
std::cerr << "(finishPrimaryIndexSerialization) marks_count: " << index_granularity.getMarksCount() << "\n";
|
||||
|
||||
index_stream->next();
|
||||
checksums.files["primary.idx"].file_size = index_stream->count();
|
||||
|
@ -60,7 +60,7 @@ public:
|
||||
const String & part_path,
|
||||
const MergeTreeData & storage,
|
||||
const NamesAndTypesList & columns_list,
|
||||
const std::vector<MergeTreeIndexPtr> & indices_to_recalc,
|
||||
const std::vector<MergeTreeIndexPtr> & indices_to_recalc,
|
||||
const String & marks_file_extension,
|
||||
const CompressionCodecPtr & default_codec,
|
||||
const MergeTreeWriterSettings & settings,
|
||||
@ -78,7 +78,7 @@ public:
|
||||
void calculateAndSerializeSkipIndices(const Block & skip_indexes_block, size_t rows);
|
||||
|
||||
/// Shift mark and offset to prepare read next mark.
|
||||
/// You must call it after calling write method and optionally
|
||||
/// You must call it after calling write method and optionally
|
||||
/// calling calculations of primary and skip indices.
|
||||
void next();
|
||||
|
||||
@ -102,8 +102,8 @@ public:
|
||||
|
||||
void initSkipIndices();
|
||||
void initPrimaryIndex();
|
||||
|
||||
virtual void finishDataSerialization(IMergeTreeDataPart::Checksums & checksums, bool sync = false) = 0;
|
||||
|
||||
virtual void finishDataSerialization(IMergeTreeDataPart::Checksums & checksums, bool sync = false) = 0;
|
||||
void finishPrimaryIndexSerialization(MergeTreeData::DataPart::Checksums & checksums);
|
||||
void finishSkipIndicesSerialization(MergeTreeData::DataPart::Checksums & checksums);
|
||||
|
||||
|
@ -818,7 +818,7 @@ void MergeTreeData::loadDataParts(bool skip_sanity_checks)
|
||||
MergeTreePartInfo part_info;
|
||||
if (!MergeTreePartInfo::tryParsePartName(part_name, &part_info, format_version))
|
||||
return;
|
||||
|
||||
|
||||
auto part = createPart(part_name, part_info, part_disk_ptr, part_name);
|
||||
bool broken = false;
|
||||
|
||||
@ -1485,7 +1485,7 @@ MergeTreeDataPartType MergeTreeData::choosePartType(size_t bytes_uncompressed, s
|
||||
const auto settings = getSettings();
|
||||
if (bytes_uncompressed < settings->min_bytes_for_wide_part || rows_count < settings->min_rows_for_wide_part)
|
||||
return MergeTreeDataPartType::COMPACT;
|
||||
|
||||
|
||||
return MergeTreeDataPartType::WIDE;
|
||||
}
|
||||
|
||||
@ -1497,7 +1497,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeData::createPart(const String & name,
|
||||
if (type == MergeTreeDataPartType::COMPACT)
|
||||
return std::make_shared<MergeTreeDataPartCompact>(*this, name, part_info, disk, relative_path);
|
||||
else if (type == MergeTreeDataPartType::WIDE)
|
||||
return std::make_shared<MergeTreeDataPartWide>(*this, name, part_info, disk, relative_path);
|
||||
return std::make_shared<MergeTreeDataPartWide>(*this, name, part_info, disk, relative_path);
|
||||
else
|
||||
throw Exception("Unknown part type", ErrorCodes::LOGICAL_ERROR);
|
||||
}
|
||||
@ -1513,7 +1513,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeData::createPart(
|
||||
{
|
||||
auto part = createPart(name, choosePartType(bytes_uncompressed, rows_count), part_info, disk, relative_path);
|
||||
part->setColumns(columns_list);
|
||||
/// Don't save rows_count count here as it can change later
|
||||
/// Don't save rows_count count here as it can change later
|
||||
return part;
|
||||
}
|
||||
|
||||
@ -1552,7 +1552,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeData::createPart(
|
||||
}
|
||||
|
||||
return createPart(name, type, part_info, disk, relative_path);
|
||||
}
|
||||
}
|
||||
|
||||
void MergeTreeData::alterDataPart(
|
||||
const NamesAndTypesList & new_columns,
|
||||
@ -1639,7 +1639,7 @@ void MergeTreeData::alterDataPart(
|
||||
}
|
||||
|
||||
DataPart::Checksums add_checksums;
|
||||
|
||||
|
||||
if (transaction->rename_map.empty() && !res.force_update_metadata)
|
||||
{
|
||||
transaction->clear();
|
||||
|
@ -187,7 +187,7 @@ public:
|
||||
const MergeTreePartInfo & part_info,const DiskSpace::DiskPtr & disk,
|
||||
const NamesAndTypesList & columns,
|
||||
size_t bytes_on_disk, size_t rows_num, const String & relative_path) const;
|
||||
|
||||
|
||||
MutableDataPartPtr createPart(const String & name,
|
||||
MergeTreeDataPartType type, const MergeTreePartInfo & part_info,
|
||||
const DiskSpace::DiskPtr & disk, const String & relative_path) const;
|
||||
@ -196,7 +196,7 @@ public:
|
||||
/// FIXME make this inside this function
|
||||
MutableDataPartPtr createPart(const String & name,
|
||||
const DiskSpace::DiskPtr & disk, const String & relative_path) const;
|
||||
|
||||
|
||||
MutableDataPartPtr createPart(const String & name, const MergeTreePartInfo & part_info,
|
||||
const DiskSpace::DiskPtr & disk, const String & relative_path) const;
|
||||
|
||||
@ -910,7 +910,7 @@ protected:
|
||||
const NamesAndTypesList & new_columns,
|
||||
const IndicesASTs & old_indices,
|
||||
const IndicesASTs & new_indices) const;
|
||||
|
||||
|
||||
/// Expression for column type conversion.
|
||||
/// If no conversions are needed, out_expression=nullptr.
|
||||
/// out_rename_map maps column files for the out_expression onto new table files.
|
||||
|
@ -568,7 +568,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mergePartsToTempor
|
||||
size_t estimated_bytes_uncompressed = 0;
|
||||
for (const auto & part : parts)
|
||||
estimated_bytes_uncompressed += part->getTotalColumnsSize().data_uncompressed;
|
||||
|
||||
|
||||
MergeTreeData::MutableDataPartPtr new_data_part = data.createPart(
|
||||
future_part.name,
|
||||
future_part.part_info,
|
||||
@ -622,7 +622,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mergePartsToTempor
|
||||
|
||||
for (const MergeTreeData::DataPartPtr & part : parts)
|
||||
part->accumulateColumnSizes(merged_column_to_size);
|
||||
|
||||
|
||||
column_sizes = ColumnSizeEstimator(merged_column_to_size, merging_column_names, gathering_column_names);
|
||||
}
|
||||
else
|
||||
@ -946,7 +946,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mutatePartToTempor
|
||||
future_part.parts[0]->info.partition_id == data.getPartitionIDFromQuery(
|
||||
command.partition, context_for_reading);
|
||||
});
|
||||
|
||||
|
||||
if (isCompactPart(source_part))
|
||||
commands_for_part.additional_columns = source_part->columns.getNames();
|
||||
|
||||
@ -959,7 +959,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mutatePartToTempor
|
||||
}
|
||||
else
|
||||
LOG_TRACE(log, "Mutating part " << source_part->name << " to mutation version " << future_part.part_info.mutation);
|
||||
|
||||
|
||||
auto in = mutations_interpreter.execute(table_lock_holder);
|
||||
const auto & updated_header = mutations_interpreter.getUpdatedHeader();
|
||||
|
||||
@ -980,7 +980,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mutatePartToTempor
|
||||
source_part->bytes_on_disk,
|
||||
source_part->rows_count,
|
||||
"tmp_mut_" + future_part.name);
|
||||
|
||||
|
||||
new_data_part->is_temp = true;
|
||||
new_data_part->ttl_infos = source_part->ttl_infos;
|
||||
|
||||
|
@ -52,7 +52,7 @@ namespace ErrorCodes
|
||||
// return ReadBufferFromFile(path, std::min(static_cast<Poco::File::FileSize>(DBMS_DEFAULT_BUFFER_SIZE), Poco::File(path).getSize()));
|
||||
// }
|
||||
|
||||
MergeTreeDataPartCompact::MergeTreeDataPartCompact(
|
||||
MergeTreeDataPartCompact::MergeTreeDataPartCompact(
|
||||
MergeTreeData & storage_,
|
||||
const String & name_,
|
||||
const DiskSpace::DiskPtr & disk_,
|
||||
@ -154,7 +154,7 @@ String MergeTreeDataPartCompact::getColumnNameWithMinumumCompressedSize() const
|
||||
|
||||
if (!minimum_size_column)
|
||||
throw Exception("Could not find a column of minimum size in MergeTree, part " + getFullPath(), ErrorCodes::LOGICAL_ERROR);
|
||||
|
||||
|
||||
return *minimum_size_column;
|
||||
}
|
||||
|
||||
@ -192,7 +192,7 @@ void MergeTreeDataPartCompact::loadIndexGranularity()
|
||||
if (index_granularity.getMarksCount() * index_granularity_info.mark_size_in_bytes != marks_file_size)
|
||||
throw Exception("Cannot read all marks from file " + marks_file_path, ErrorCodes::CANNOT_READ_ALL_DATA);
|
||||
|
||||
index_granularity.setInitialized();
|
||||
index_granularity.setInitialized();
|
||||
}
|
||||
|
||||
bool MergeTreeDataPartCompact::hasColumnFiles(const String & column_name, const IDataType &) const
|
||||
@ -246,9 +246,9 @@ NameToNameMap MergeTreeDataPartCompact::createRenameMapForAlter(
|
||||
projection.emplace_back(column.name, "");
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
analysis_result.expression->add(ExpressionAction::project(projection));
|
||||
|
||||
|
||||
String data_temp_name = String(DATA_FILE_NAME) + "_converting";
|
||||
rename_map[data_temp_name + DATA_FILE_EXTENSION] = String(DATA_FILE_NAME) + DATA_FILE_EXTENSION;
|
||||
rename_map[data_temp_name + part_mrk_file_extension] = DATA_FILE_NAME + part_mrk_file_extension;
|
||||
|
@ -44,7 +44,7 @@ public:
|
||||
const DiskSpace::DiskPtr & disk_,
|
||||
const std::optional<String> & relative_path_ = {});
|
||||
|
||||
MergeTreeDataPartCompact(
|
||||
MergeTreeDataPartCompact(
|
||||
MergeTreeData & storage_,
|
||||
const String & name_,
|
||||
const DiskSpace::DiskPtr & disk_,
|
||||
@ -58,7 +58,7 @@ public:
|
||||
const MergeTreeReaderSettings & reader_settings_,
|
||||
const ValueSizeMap & avg_value_size_hints = ValueSizeMap{},
|
||||
const ReadBufferFromFileBase::ProfileCallback & profile_callback = ReadBufferFromFileBase::ProfileCallback{}) const override;
|
||||
|
||||
|
||||
MergeTreeWriterPtr getWriter(
|
||||
const NamesAndTypesList & columns_list,
|
||||
const std::vector<MergeTreeIndexPtr> & indices_to_recalc,
|
||||
|
@ -49,7 +49,7 @@ namespace ErrorCodes
|
||||
// return ReadBufferFromFile(path, std::min(static_cast<Poco::File::FileSize>(DBMS_DEFAULT_BUFFER_SIZE), Poco::File(path).getSize()));
|
||||
// }
|
||||
|
||||
MergeTreeDataPartWide::MergeTreeDataPartWide(
|
||||
MergeTreeDataPartWide::MergeTreeDataPartWide(
|
||||
MergeTreeData & storage_,
|
||||
const String & name_,
|
||||
const DiskSpace::DiskPtr & disk_,
|
||||
@ -92,7 +92,7 @@ IMergeTreeDataPart::MergeTreeWriterPtr MergeTreeDataPartWide::getWriter(
|
||||
getFullPath(), storage, columns_list, indices_to_recalc,
|
||||
index_granularity_info.marks_file_extension,
|
||||
default_codec, writer_settings, computed_index_granularity);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/// Takes into account the fact that several columns can e.g. share their .size substreams.
|
||||
|
@ -14,7 +14,7 @@ MergeTreeDataPartWriterCompact::MergeTreeDataPartWriterCompact(
|
||||
const String & part_path_,
|
||||
const MergeTreeData & storage_,
|
||||
const NamesAndTypesList & columns_list_,
|
||||
const std::vector<MergeTreeIndexPtr> & indices_to_recalc_,
|
||||
const std::vector<MergeTreeIndexPtr> & indices_to_recalc_,
|
||||
const String & marks_file_extension_,
|
||||
const CompressionCodecPtr & default_codec_,
|
||||
const MergeTreeWriterSettings & settings_,
|
||||
@ -78,7 +78,7 @@ void MergeTreeDataPartWriterCompact::write(
|
||||
auto result = squashing.add(result_block.mutateColumns());
|
||||
if (!result.ready)
|
||||
return;
|
||||
|
||||
|
||||
result_block = header.cloneWithColumns(std::move(result.columns));
|
||||
|
||||
writeBlock(result_block);
|
||||
@ -86,7 +86,7 @@ void MergeTreeDataPartWriterCompact::write(
|
||||
|
||||
void MergeTreeDataPartWriterCompact::writeBlock(const Block & block)
|
||||
{
|
||||
size_t total_rows = block.rows();
|
||||
size_t total_rows = block.rows();
|
||||
size_t from_mark = current_mark;
|
||||
size_t current_row = 0;
|
||||
|
||||
@ -100,7 +100,7 @@ void MergeTreeDataPartWriterCompact::writeBlock(const Block & block)
|
||||
/// There could already be enough data to compress into the new block.
|
||||
if (stream->compressed.offset() >= settings.min_compress_block_size)
|
||||
stream->compressed.next();
|
||||
|
||||
|
||||
size_t next_row = 0;
|
||||
writeIntBinary(rows_to_write, stream->marks);
|
||||
for (const auto & it : columns_list)
|
||||
@ -139,13 +139,13 @@ size_t MergeTreeDataPartWriterCompact::writeColumnSingleGranule(const ColumnWith
|
||||
|
||||
/// We can't calculate compressed size by single column in compact format.
|
||||
size_t uncompressed_size = stream->compressed.count();
|
||||
columns_sizes[column.name].add(ColumnSize{0, 0, uncompressed_size - old_uncompressed_size});
|
||||
columns_sizes[column.name].add(ColumnSize{0, 0, uncompressed_size - old_uncompressed_size});
|
||||
|
||||
return from_row + number_of_rows;
|
||||
}
|
||||
|
||||
void MergeTreeDataPartWriterCompact::finishDataSerialization(IMergeTreeDataPart::Checksums & checksums, bool sync)
|
||||
{
|
||||
{
|
||||
auto result = squashing.add({});
|
||||
if (result.ready && !result.columns.empty())
|
||||
writeBlock(header.cloneWithColumns(std::move(result.columns)));
|
||||
|
@ -11,7 +11,7 @@ public:
|
||||
const String & part_path,
|
||||
const MergeTreeData & storage,
|
||||
const NamesAndTypesList & columns_list,
|
||||
const std::vector<MergeTreeIndexPtr> & indices_to_recalc,
|
||||
const std::vector<MergeTreeIndexPtr> & indices_to_recalc,
|
||||
const String & marks_file_extension,
|
||||
const CompressionCodecPtr & default_codec,
|
||||
const MergeTreeWriterSettings & settings,
|
||||
|
@ -12,7 +12,7 @@ MergeTreeDataPartWriterWide::MergeTreeDataPartWriterWide(
|
||||
const String & part_path_,
|
||||
const MergeTreeData & storage_,
|
||||
const NamesAndTypesList & columns_list_,
|
||||
const std::vector<MergeTreeIndexPtr> & indices_to_recalc_,
|
||||
const std::vector<MergeTreeIndexPtr> & indices_to_recalc_,
|
||||
const String & marks_file_extension_,
|
||||
const CompressionCodecPtr & default_codec_,
|
||||
const MergeTreeWriterSettings & settings_,
|
||||
@ -80,7 +80,7 @@ IDataType::OutputStreamGetter MergeTreeDataPartWriterWide::createStreamGetter(
|
||||
};
|
||||
}
|
||||
|
||||
void MergeTreeDataPartWriterWide::write(const Block & block,
|
||||
void MergeTreeDataPartWriterWide::write(const Block & block,
|
||||
const IColumn::Permutation * permutation,
|
||||
const Block & primary_key_block, const Block & skip_indexes_block)
|
||||
{
|
||||
|
@ -13,7 +13,7 @@ public:
|
||||
const String & part_path,
|
||||
const MergeTreeData & storage,
|
||||
const NamesAndTypesList & columns_list,
|
||||
const std::vector<MergeTreeIndexPtr> & indices_to_recalc,
|
||||
const std::vector<MergeTreeIndexPtr> & indices_to_recalc,
|
||||
const String & marks_file_extension,
|
||||
const CompressionCodecPtr & default_codec,
|
||||
const MergeTreeWriterSettings & settings,
|
||||
@ -55,7 +55,7 @@ private:
|
||||
WrittenOffsetColumns & offset_columns,
|
||||
size_t number_of_rows,
|
||||
DB::IDataType::SubstreamPath & path);
|
||||
|
||||
|
||||
void writeFinalMark(
|
||||
const std::string & column_name,
|
||||
const DataTypePtr column_type,
|
||||
|
@ -567,7 +567,7 @@ BlockInputStreams MergeTreeDataSelectExecutor::readFromParts(
|
||||
|
||||
BlockInputStreams res;
|
||||
|
||||
MergeTreeReaderSettings reader_settings =
|
||||
MergeTreeReaderSettings reader_settings =
|
||||
{
|
||||
.min_bytes_to_use_direct_io = settings.min_bytes_to_use_direct_io,
|
||||
.max_read_buffer_size = settings.max_read_buffer_size,
|
||||
|
@ -206,7 +206,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataWriter::writeTempPart(BlockWithPa
|
||||
NamesAndTypesList columns = data.getColumns().getAllPhysical().filter(block.getNames());
|
||||
|
||||
auto new_data_part = data.createPart(
|
||||
part_name, new_part_info,
|
||||
part_name, new_part_info,
|
||||
reservation->getDisk(),
|
||||
columns,
|
||||
expected_size,
|
||||
|
@ -1,5 +1,5 @@
|
||||
#pragma once
|
||||
#include <cstddef>
|
||||
#include <cstddef>
|
||||
#include <Core/Settings.h>
|
||||
|
||||
namespace DB
|
||||
|
@ -21,7 +21,7 @@ std::optional<std::string> MergeTreeIndexGranularityInfo::getMrkExtensionFromFS(
|
||||
for (Poco::DirectoryIterator part_it(path_to_part); part_it != end; ++part_it)
|
||||
{
|
||||
const auto & ext = "." + part_it.path().getExtension();
|
||||
if (ext == getNonAdaptiveMrkExtension()
|
||||
if (ext == getNonAdaptiveMrkExtension()
|
||||
|| ext == getAdaptiveMrkExtension(MergeTreeDataPartType::WIDE)
|
||||
|| ext == getAdaptiveMrkExtension(MergeTreeDataPartType::COMPACT))
|
||||
return ext;
|
||||
@ -83,7 +83,7 @@ void MergeTreeIndexGranularityInfo::setNonAdaptive()
|
||||
|
||||
std::string getAdaptiveMrkExtension(MergeTreeDataPartType part_type)
|
||||
{
|
||||
switch(part_type)
|
||||
switch (part_type)
|
||||
{
|
||||
case MergeTreeDataPartType::WIDE:
|
||||
return ".mrk2";
|
||||
@ -96,7 +96,7 @@ std::string getAdaptiveMrkExtension(MergeTreeDataPartType part_type)
|
||||
|
||||
size_t getAdaptiveMrkSize(MergeTreeDataPartType part_type, size_t columns_num)
|
||||
{
|
||||
switch(part_type)
|
||||
switch (part_type)
|
||||
{
|
||||
case MergeTreeDataPartType::WIDE:
|
||||
return sizeof(UInt64) * 3;
|
||||
|
@ -8,7 +8,7 @@ MergeTreeIndexReader::MergeTreeIndexReader(
|
||||
MergeTreeIndexPtr index_, MergeTreeData::DataPartPtr part_, size_t marks_count_, const MarkRanges & all_mark_ranges_)
|
||||
: index(index_), stream(
|
||||
part_->getFullPath() + index->getFileName(), ".idx", marks_count_,
|
||||
all_mark_ranges_,
|
||||
all_mark_ranges_,
|
||||
{ 0, DBMS_DEFAULT_BUFFER_SIZE, false}, nullptr, nullptr,
|
||||
part_->getFileSizeOrZero(index->getFileName() + ".idx"),
|
||||
&part_->index_granularity_info,
|
||||
|
@ -20,9 +20,9 @@ const MarkInCompressedFile & MergeTreeMarksLoader::getMark(size_t row_index, siz
|
||||
if (!marks)
|
||||
loadMarks();
|
||||
if (column_index >= columns_num)
|
||||
throw Exception("Column index: " + toString(column_index)
|
||||
throw Exception("Column index: " + toString(column_index)
|
||||
+ " is out of range [0, " + toString(columns_num) + ")", ErrorCodes::LOGICAL_ERROR);
|
||||
|
||||
|
||||
return (*marks)[row_index * columns_num + column_index];
|
||||
}
|
||||
|
||||
@ -47,7 +47,7 @@ void MergeTreeMarksLoader::loadMarks()
|
||||
marks = load();
|
||||
|
||||
if (!marks)
|
||||
throw Exception("Failed to load marks: " + mrk_path, ErrorCodes::LOGICAL_ERROR);
|
||||
throw Exception("Failed to load marks: " + mrk_path, ErrorCodes::LOGICAL_ERROR);
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -16,7 +16,7 @@ public:
|
||||
const LoadFunc & load_func_,
|
||||
bool save_marks_in_cache_,
|
||||
size_t columns_num_ = 1);
|
||||
|
||||
|
||||
const MarkInCompressedFile & getMark(size_t row_index, size_t column_index = 0);
|
||||
|
||||
bool initialized() const { return marks != nullptr; }
|
||||
|
@ -109,7 +109,7 @@ bool MergeTreePartsMover::selectPartsForMove(
|
||||
/// Don't report message to log, because logging is excessive
|
||||
if (!can_move(part, &reason))
|
||||
continue;
|
||||
|
||||
|
||||
auto to_insert = need_to_move.find(part->disk);
|
||||
if (to_insert != need_to_move.end())
|
||||
to_insert->second.add(part);
|
||||
|
@ -483,7 +483,7 @@ size_t MergeTreeRangeReader::Stream::ceilRowsToCompleteGranules(size_t rows_num)
|
||||
size_t from_mark = current_mark;
|
||||
while (result < rows_num && from_mark < last_mark)
|
||||
result += index_granularity->getMarkRows(from_mark++);
|
||||
|
||||
|
||||
return result;
|
||||
}
|
||||
|
||||
|
@ -95,7 +95,7 @@ size_t MergeTreeReaderCompact::readRows(size_t from_mark, bool continue_reading,
|
||||
size_t read_rows_in_column = column->size() - column_size_before_reading;
|
||||
|
||||
if (read_rows_in_column < rows_to_read)
|
||||
throw Exception("Cannot read all data in MergeTreeReaderCompact. Rows read: " + toString(read_rows_in_column) +
|
||||
throw Exception("Cannot read all data in MergeTreeReaderCompact. Rows read: " + toString(read_rows_in_column) +
|
||||
". Rows expected: " + toString(rows_to_read) + ".", ErrorCodes::CANNOT_READ_ALL_DATA);
|
||||
|
||||
/// For elements of Nested, column_size_before_reading may be greater than column size
|
||||
@ -142,7 +142,7 @@ void MergeTreeReaderCompact::readData(
|
||||
|
||||
/// FIXME seek only if needed
|
||||
seekToMark(from_mark, column_position);
|
||||
|
||||
|
||||
IDataType::DeserializeBinaryBulkSettings deserialize_settings;
|
||||
deserialize_settings.getter = [&](IDataType::SubstreamPath) -> ReadBuffer * { return data_buffer; };
|
||||
// deserialize_settings.avg_value_size_hint = avg_value_size_hints[name];
|
||||
|
@ -117,11 +117,11 @@ void MergeTreeReaderStream::initMarksLoader()
|
||||
/// Memory for marks must not be accounted as memory usage for query, because they are stored in shared cache.
|
||||
auto temporarily_disable_memory_tracker = getCurrentMemoryTrackerActionLock();
|
||||
|
||||
std::cerr << "data_file_extension: " << data_file_extension << '\n';
|
||||
std::cerr << "data_file_extension: " << data_file_extension << '\n';
|
||||
|
||||
size_t file_size = Poco::File(mrk_path).getSize();
|
||||
size_t mark_size = mode == ReadingMode::INDEX
|
||||
? index_granularity_info->skip_index_mark_size_in_bytes
|
||||
? index_granularity_info->skip_index_mark_size_in_bytes
|
||||
: index_granularity_info->mark_size_in_bytes;
|
||||
|
||||
size_t expected_file_size = mark_size * marks_count;
|
||||
|
@ -158,10 +158,10 @@ void MergeTreeReaderWide::addStreams(const String & name, const IDataType & type
|
||||
*/
|
||||
if (!data_file_exists)
|
||||
return;
|
||||
|
||||
|
||||
std::cerr << "(addStreams) part: " << path << '\n';
|
||||
std::cerr << "(addStreams) marks count: " << data_part->getMarksCount() << "\n";
|
||||
|
||||
|
||||
streams.emplace(stream_name, std::make_unique<MergeTreeReaderStream>(
|
||||
path + stream_name, DATA_FILE_EXTENSION, data_part->getMarksCount(),
|
||||
all_mark_ranges, settings, mark_cache,
|
||||
|
@ -36,7 +36,7 @@ private:
|
||||
FileStreams streams;
|
||||
|
||||
/// Columns that are read.
|
||||
|
||||
|
||||
void addStreams(const String & name, const IDataType & type,
|
||||
const ReadBufferFromFileBase::ProfileCallback & profile_callback, clockid_t clock_type);
|
||||
|
||||
|
@ -30,7 +30,7 @@ MergeTreeSelectBlockInputStream::MergeTreeSelectBlockInputStream(
|
||||
bool quiet)
|
||||
:
|
||||
MergeTreeBaseSelectBlockInputStream{storage_, prewhere_info_, max_block_size_rows_,
|
||||
preferred_block_size_bytes_, preferred_max_column_in_block_size_bytes_,
|
||||
preferred_block_size_bytes_, preferred_max_column_in_block_size_bytes_,
|
||||
reader_settings_, use_uncompressed_cache_, virt_column_names_},
|
||||
required_columns{required_columns_},
|
||||
data_part{owned_data_part_},
|
||||
|
@ -63,7 +63,7 @@ MergeTreeSequentialBlockInputStream::MergeTreeSequentialBlockInputStream(
|
||||
.save_marks_in_cache = false
|
||||
};
|
||||
|
||||
reader = data_part->getReader(columns_for_reader,
|
||||
reader = data_part->getReader(columns_for_reader,
|
||||
MarkRanges{MarkRange(0, data_part->getMarksCount())},
|
||||
/* uncompressed_cache = */ nullptr, mark_cache.get(), reader_settings);
|
||||
}
|
||||
|
@ -26,7 +26,7 @@ MergedBlockOutputStream::MergedBlockOutputStream(
|
||||
bool blocks_are_granules_size)
|
||||
: IMergedBlockOutputStream(data_part)
|
||||
, columns_list(columns_list_)
|
||||
{
|
||||
{
|
||||
MergeTreeWriterSettings writer_settings(data_part->storage.global_context.getSettings(),
|
||||
data_part->storage.canUseAdaptiveGranularity(), blocks_are_granules_size);
|
||||
writer = data_part->getWriter(columns_list, data_part->storage.getSkipIndices(), default_codec, writer_settings);
|
||||
@ -168,8 +168,6 @@ void MergedBlockOutputStream::writeSuffixAndFinalizePart(
|
||||
new_part->bytes_on_disk = checksums.getTotalSizeOnDisk();
|
||||
new_part->index_granularity = writer->getIndexGranularity();
|
||||
new_part->columns_sizes = columns_sizes;
|
||||
std::cerr << "(writeSuffixAndFinalizePart) part: " << new_part->getFullPath() << "\n";
|
||||
std::cerr << "(writeSuffixAndFinalizePart) marks_count: " << new_part->index_granularity.getMarksCount() << "\n";
|
||||
}
|
||||
|
||||
void MergedBlockOutputStream::init()
|
||||
|
@ -44,7 +44,7 @@ void MergedColumnOnlyOutputStream::write(const Block & block)
|
||||
if (!rows)
|
||||
return;
|
||||
|
||||
std::cerr << "(MergedColumnOnlyOutputStream::write) writing rows: " << rows << "\n";
|
||||
std::cerr << "(MergedColumnOnlyOutputStream::write) writing rows: " << rows << "\n";
|
||||
|
||||
writer->write(block);
|
||||
writer->calculateAndSerializeSkipIndices(skip_indexes_block, rows);
|
||||
|
@ -201,7 +201,7 @@ MergeTreeData::DataPart::Checksums checkDataPart(
|
||||
ReadBufferFromFile buf(path + "columns.txt");
|
||||
columns.readText(buf);
|
||||
assertEOF(buf);
|
||||
}
|
||||
}
|
||||
|
||||
/// Checksums from file checksums.txt. May be absent. If present, they are subsequently compared with the actual data checksums.
|
||||
MergeTreeData::DataPart::Checksums checksums_txt;
|
||||
|
@ -48,7 +48,7 @@ public:
|
||||
void writeText(WriteBuffer & out) const;
|
||||
void readText(ReadBuffer & in);
|
||||
|
||||
/// Columns that we need to read except ones needed for expressions.
|
||||
/// Extra columns that we need to read except ones needed for expressions.
|
||||
Names additional_columns;
|
||||
};
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user