This commit is contained in:
kssenii 2022-09-05 18:55:00 +02:00
parent e12858dca5
commit 83514fa2ef
39 changed files with 444 additions and 185 deletions

View File

@ -145,5 +145,11 @@ String FieldVisitorToString::operator() (const Object & x) const
}
String convertFieldToString(const Field & field)
{
if (field.getType() == Field::Types::Which::String)
return field.get<String>();
return applyVisitor(FieldVisitorToString(), field);
}
}

View File

@ -31,5 +31,8 @@ public:
String operator() (const bool & x) const;
};
}
/// Get value from field and convert it to string.
/// Also remove quotes from strings.
String convertFieldToString(const Field & field);
}

View File

@ -44,15 +44,6 @@ struct AttributeConfiguration
using AttributeNameToConfiguration = std::unordered_map<std::string, AttributeConfiguration>;
/// Get value from field and convert it to string.
/// Also remove quotes from strings.
String getFieldAsString(const Field & field)
{
if (field.getType() == Field::Types::Which::String)
return field.get<String>();
return applyVisitor(FieldVisitorToString(), field);
}
String getAttributeExpression(const ASTDictionaryAttributeDeclaration * dict_attr)
{
if (!dict_attr->expression)
@ -61,7 +52,7 @@ String getAttributeExpression(const ASTDictionaryAttributeDeclaration * dict_att
/// EXPRESSION PROPERTY should be expression or string
String expression_str;
if (const auto * literal = dict_attr->expression->as<ASTLiteral>(); literal && literal->value.getType() == Field::Types::String)
expression_str = getFieldAsString(literal->value);
expression_str = convertFieldToString(literal->value);
else
expression_str = queryToString(dict_attr->expression);
@ -275,7 +266,7 @@ void buildSingleAttribute(
AutoPtr<Element> null_value_element(doc->createElement("null_value"));
String null_value_str;
if (dict_attr->default_value)
null_value_str = getFieldAsString(dict_attr->default_value->as<ASTLiteral>()->value);
null_value_str = convertFieldToString(dict_attr->default_value->as<ASTLiteral>()->value);
AutoPtr<Text> null_value(doc->createTextNode(null_value_str));
null_value_element->appendChild(null_value);
attribute_element->appendChild(null_value_element);
@ -452,7 +443,7 @@ void buildConfigurationFromFunctionWithKeyValueArguments(
}
else if (const auto * literal = pair->second->as<const ASTLiteral>())
{
AutoPtr<Text> value(doc->createTextNode(getFieldAsString(literal->value)));
AutoPtr<Text> value(doc->createTextNode(convertFieldToString(literal->value)));
current_xml_element->appendChild(value);
}
else if (const auto * list = pair->second->as<const ASTExpressionList>())
@ -473,7 +464,7 @@ void buildConfigurationFromFunctionWithKeyValueArguments(
Field value;
result->get(0, value);
AutoPtr<Text> text_value(doc->createTextNode(getFieldAsString(value)));
AutoPtr<Text> text_value(doc->createTextNode(convertFieldToString(value)));
current_xml_element->appendChild(text_value);
}
else
@ -519,7 +510,7 @@ void buildSourceConfiguration(
{
AutoPtr<Element> setting_change_element(doc->createElement(name));
settings_element->appendChild(setting_change_element);
AutoPtr<Text> setting_value(doc->createTextNode(getFieldAsString(value)));
AutoPtr<Text> setting_value(doc->createTextNode(convertFieldToString(value)));
setting_change_element->appendChild(setting_value);
}
}

View File

@ -239,7 +239,16 @@ public:
}
/// For one local path there might be multiple remote paths in case of Log family engines.
using LocalPathWithObjectStoragePaths = std::pair<String, StoredObjects>;
struct LocalPathWithObjectStoragePaths
{
std::string local_path;
std::string common_prefix_for_objects;
StoredObjects objects;
LocalPathWithObjectStoragePaths(
const std::string & local_path_, const std::string & common_prefix_for_objects_, StoredObjects && objects_)
: local_path(local_path_), common_prefix_for_objects(common_prefix_for_objects_), objects(std::move(objects_)) {}
};
virtual void getRemotePathsRecursive(const String &, std::vector<LocalPathWithObjectStoragePaths> &)
{

View File

@ -127,7 +127,7 @@ void DiskObjectStorage::getRemotePathsRecursive(const String & local_path, std::
{
try
{
paths_map.emplace_back(local_path, getStorageObjects(local_path));
paths_map.emplace_back(local_path, metadata_storage->getObjectStorageRootPath(), getStorageObjects(local_path));
}
catch (const Exception & e)
{

View File

@ -68,6 +68,14 @@ void DiskObjectStorageMetadata::deserialize(ReadBuffer & buf)
}
}
void DiskObjectStorageMetadata::createFromSingleObject(const std::string & relative_path, size_t bytes_size, size_t ref_count_, bool read_only_)
{
storage_objects.emplace_back(relative_path, bytes_size);
total_size = bytes_size;
ref_count = ref_count_;
read_only = read_only_;
}
void DiskObjectStorageMetadata::deserializeFromString(const std::string & data)
{
ReadBufferFromString buf(data);

View File

@ -50,6 +50,7 @@ public:
void deserialize(ReadBuffer & buf);
void deserializeFromString(const std::string & data);
void createFromSingleObject(const std::string & relative_path, size_t bytes_size, size_t ref_count_, bool is_read_only_);
void serialize(WriteBuffer & buf, bool sync) const;
std::string serializeToString() const;

View File

@ -179,7 +179,6 @@ Pipe ReadFromMergeTree::readFromPool(
sum_marks,
min_marks_for_concurrent_read,
std::move(parts_with_range),
data,
storage_snapshot,
prewhere_info,
required_columns,

View File

@ -0,0 +1,24 @@
#pragma once
#include <string>
#include <unordered_map>
namespace DB
{
/// Alter conversions which should be applied on-fly for part. Build from of
/// the most recent mutation commands for part. Now we have only rename_map
/// here (from ALTER_RENAME) command, because for all other type of alters
/// we can deduce conversions for part from difference between
/// part->getColumns() and storage->getColumns().
struct AlterConversions
{
/// Rename map new_name -> old_name
std::unordered_map<std::string, std::string> rename_map;
bool isColumnRenamed(const std::string & new_name) const { return rename_map.count(new_name) > 0; }
std::string getColumnOldName(const std::string & new_name) const { return rename_map.at(new_name); }
};
}

View File

@ -0,0 +1,68 @@
#pragma once
#include <Interpreters/Context.h>
#include <Storages/MergeTree/AlterConversions.h>
#include <Core/NamesAndTypes.h>
namespace DB
{
class IDataPartStorage;
using DataPartStoragePtr = std::shared_ptr<IDataPartStorage>;
class MergeTreeIndexGranularity;
struct MergeTreeDataPartChecksums;
struct MergeTreeIndexGranularityInfo;
class ISerialization;
using SerializationPtr = std::shared_ptr<const ISerialization>;
/**
* A class which contains all information about a data part that is required
* in order to use MergeTreeDataPartReader's.
* It is a separate interface and not a simple struct because
* otherwise it will need to copy all the information which might not
* be even used (for example, an IndexGranulary class object is quite heavy).
*/
class IMergeTreeDataPartInfoForReader : public WithContext
{
public:
explicit IMergeTreeDataPartInfoForReader(ContextPtr context_) : WithContext(context_) {}
virtual ~IMergeTreeDataPartInfoForReader() = default;
virtual bool isCompactPart() const = 0;
virtual bool isWidePart() const = 0;
virtual bool isInMemoryPart() const = 0;
virtual bool isProjectionPart() const = 0;
virtual const DataPartStoragePtr & getDataPartStorage() const = 0;
virtual const NamesAndTypesList & getColumns() const = 0;
virtual std::optional<size_t> getColumnPosition(const String & column_name) const = 0;
virtual String getColumnNameWithMinimumCompressedSize(bool with_subcolumns) const = 0;
virtual const MergeTreeDataPartChecksums & getChecksums() const = 0;
virtual AlterConversions getAlterConversions() const = 0;
virtual size_t getMarksCount() const = 0;
virtual size_t getFileSizeOrZero(const std::string & file_name) const = 0;
virtual const MergeTreeIndexGranularityInfo & getIndexGranularityInfo() const = 0;
virtual const MergeTreeIndexGranularity & getIndexGranularity() const = 0;
virtual SerializationPtr getSerialization(const NameAndTypePair & column) const = 0;
virtual const SerializationInfoByName & getSerializationInfos() const = 0;
virtual void reportBroken() = 0;
};
using MergeTreeDataPartInfoForReaderPtr = std::shared_ptr<IMergeTreeDataPartInfoForReader>;
}

View File

@ -23,7 +23,7 @@ namespace ErrorCodes
IMergeTreeReader::IMergeTreeReader(
const MergeTreeData::DataPartPtr & data_part_,
MergeTreeDataPartInfoForReaderPtr data_part_info_for_read_,
const NamesAndTypesList & columns_,
const StorageMetadataPtr & metadata_snapshot_,
UncompressedCache * uncompressed_cache_,
@ -31,19 +31,18 @@ IMergeTreeReader::IMergeTreeReader(
const MarkRanges & all_mark_ranges_,
const MergeTreeReaderSettings & settings_,
const ValueSizeMap & avg_value_size_hints_)
: data_part(data_part_)
: data_part_info_for_read(data_part_info_for_read_)
, avg_value_size_hints(avg_value_size_hints_)
, uncompressed_cache(uncompressed_cache_)
, mark_cache(mark_cache_)
, settings(settings_)
, storage(data_part_->storage)
, metadata_snapshot(metadata_snapshot_)
, all_mark_ranges(all_mark_ranges_)
, alter_conversions(storage.getAlterConversionsForPart(data_part))
, alter_conversions(data_part_info_for_read->getAlterConversions())
/// For wide parts convert plain arrays of Nested to subcolumns
/// to allow to use shared offset column from cache.
, requested_columns(isWidePart(data_part) ? Nested::convertToSubcolumns(columns_) : columns_)
, part_columns(isWidePart(data_part) ? Nested::collect(data_part->getColumns()) : data_part->getColumns())
, requested_columns(data_part_info_for_read->isWidePart() ? Nested::convertToSubcolumns(columns_) : columns_)
, part_columns(data_part_info_for_read->isWidePart() ? Nested::collect(data_part_info_for_read->getColumns()) : data_part_info_for_read->getColumns())
{
columns_to_read.reserve(requested_columns.size());
serializations.reserve(requested_columns.size());
@ -71,7 +70,7 @@ void IMergeTreeReader::fillMissingColumns(Columns & res_columns, bool & should_e
catch (Exception & e)
{
/// Better diagnostics.
e.addMessage("(while reading from part " + data_part->data_part_storage->getFullPath() + ")");
e.addMessage("(while reading from part " + data_part_info_for_read->getDataPartStorage()->getFullPath() + ")");
throw;
}
}
@ -99,13 +98,13 @@ void IMergeTreeReader::evaluateMissingDefaults(Block additional_columns, Columns
}
auto dag = DB::evaluateMissingDefaults(
additional_columns, requested_columns, metadata_snapshot->getColumns(), storage.getContext());
additional_columns, requested_columns, metadata_snapshot->getColumns(), data_part_info_for_read->getContext());
if (dag)
{
dag->addMaterializingOutputActions();
auto actions = std::make_shared<
ExpressionActions>(std::move(dag),
ExpressionActionsSettings::fromSettings(storage.getContext()->getSettingsRef()));
ExpressionActionsSettings::fromSettings(data_part_info_for_read->getContext()->getSettingsRef()));
actions->execute(additional_columns);
}
@ -117,7 +116,7 @@ void IMergeTreeReader::evaluateMissingDefaults(Block additional_columns, Columns
catch (Exception & e)
{
/// Better diagnostics.
e.addMessage("(while reading from part " + data_part->data_part_storage->getFullPath() + ")");
e.addMessage("(while reading from part " + data_part_info_for_read->getDataPartStorage()->getFullPath() + ")");
throw;
}
}
@ -151,7 +150,7 @@ SerializationPtr IMergeTreeReader::getSerializationInPart(const NameAndTypePair
if (!column_in_part)
return IDataType::getSerialization(required_column);
const auto & infos = data_part->getSerializationInfos();
const auto & infos = data_part_info_for_read->getSerializationInfos();
if (auto it = infos.find(column_in_part->getNameInStorage()); it != infos.end())
return IDataType::getSerialization(*column_in_part, *it->second);
@ -187,7 +186,7 @@ void IMergeTreeReader::performRequiredConversions(Columns & res_columns) const
copy_block.insert({res_columns[pos], getColumnInPart(*name_and_type).type, name_and_type->name});
}
DB::performRequiredConversions(copy_block, requested_columns, storage.getContext());
DB::performRequiredConversions(copy_block, requested_columns, data_part_info_for_read->getContext());
/// Move columns from block.
name_and_type = requested_columns.begin();
@ -197,7 +196,7 @@ void IMergeTreeReader::performRequiredConversions(Columns & res_columns) const
catch (Exception & e)
{
/// Better diagnostics.
e.addMessage("(while reading from part " + data_part->data_part_storage->getFullPath() + ")");
e.addMessage("(while reading from part " + data_part_info_for_read->getDataPartStorage()->getFullPath() + ")");
throw;
}
}
@ -205,11 +204,11 @@ void IMergeTreeReader::performRequiredConversions(Columns & res_columns) const
IMergeTreeReader::ColumnPosition IMergeTreeReader::findColumnForOffsets(const String & column_name) const
{
String table_name = Nested::extractTableName(column_name);
for (const auto & part_column : data_part->getColumns())
for (const auto & part_column : data_part_info_for_read->getColumns())
{
if (typeid_cast<const DataTypeArray *>(part_column.type.get()))
{
auto position = data_part->getColumnPosition(part_column.getNameInStorage());
auto position = data_part_info_for_read->getColumnPosition(part_column.getNameInStorage());
if (position && Nested::extractTableName(part_column.name) == table_name)
return position;
}

View File

@ -4,6 +4,8 @@
#include <Common/HashTable/HashMap.h>
#include <Storages/MergeTree/MergeTreeReaderStream.h>
#include <Storages/MergeTree/MergeTreeBlockReadUtils.h>
#include <Storages/MergeTree/IMergeTreeDataPart.h>
#include <Storages/MergeTree/IMergeTreeDataPartInfoForReader.h>
namespace DB
{
@ -20,7 +22,7 @@ public:
using DeserializeBinaryBulkStateMap = std::map<std::string, ISerialization::DeserializeBinaryBulkStatePtr>;
IMergeTreeReader(
const MergeTreeData::DataPartPtr & data_part_,
MergeTreeDataPartInfoForReaderPtr data_part_info_for_read_,
const NamesAndTypesList & columns_,
const StorageMetadataPtr & metadata_snapshot_,
UncompressedCache * uncompressed_cache_,
@ -57,7 +59,7 @@ public:
size_t getFirstMarkToRead() const { return all_mark_ranges.front().begin; }
MergeTreeData::DataPartPtr data_part;
MergeTreeDataPartInfoForReaderPtr data_part_info_for_read;
protected:
/// Returns actual column name in part, which can differ from table metadata.
@ -86,7 +88,6 @@ protected:
MergeTreeReaderSettings settings;
const MergeTreeData & storage;
StorageMetadataPtr metadata_snapshot;
MarkRanges all_mark_ranges;
@ -95,7 +96,7 @@ protected:
private:
/// Alter conversions, which must be applied on fly if required
MergeTreeData::AlterConversions alter_conversions;
AlterConversions alter_conversions;
/// Columns that are requested to read.
NamesAndTypesList requested_columns;

View File

@ -0,0 +1,55 @@
#pragma once
#include <Storages/MergeTree/IMergeTreeDataPartInfoForReader.h>
#include <Storages/MergeTree/MergeTreeData.h>
namespace DB
{
class LoadedMergeTreeDataPartInfoForReader final : public IMergeTreeDataPartInfoForReader
{
public:
explicit LoadedMergeTreeDataPartInfoForReader(MergeTreeData::DataPartPtr data_part_)
: IMergeTreeDataPartInfoForReader(data_part_->storage.getContext())
, data_part(data_part_)
{}
bool isCompactPart() const override { return DB::isCompactPart(data_part); }
bool isWidePart() const override { return DB::isWidePart(data_part); }
bool isInMemoryPart() const override { return DB::isInMemoryPart(data_part); }
bool isProjectionPart() const override { return data_part->isProjectionPart(); }
const DataPartStoragePtr & getDataPartStorage() const override { return data_part->data_part_storage; }
const NamesAndTypesList & getColumns() const override { return data_part->getColumns(); }
std::optional<size_t> getColumnPosition(const String & column_name) const override { return data_part->getColumnPosition(column_name); }
AlterConversions getAlterConversions() const override { return data_part->storage.getAlterConversionsForPart(data_part); }
String getColumnNameWithMinimumCompressedSize(bool with_subcolumns) const override { return data_part->getColumnNameWithMinimumCompressedSize(with_subcolumns); }
const MergeTreeDataPartChecksums & getChecksums() const override { return data_part->checksums; }
void reportBroken() override { data_part->storage.reportBrokenPart(data_part); }
size_t getMarksCount() const override { return data_part->getMarksCount(); }
size_t getFileSizeOrZero(const std::string & file_name) const override { return data_part->getFileSizeOrZero(file_name); }
const MergeTreeIndexGranularityInfo & getIndexGranularityInfo() const override { return data_part->index_granularity_info; }
const MergeTreeIndexGranularity & getIndexGranularity() const override { return data_part->index_granularity; }
const SerializationInfoByName & getSerializationInfos() const override { return data_part->getSerializationInfos(); }
SerializationPtr getSerialization(const NameAndTypePair & column) const override { return data_part->getSerialization(column.name); }
private:
MergeTreeData::DataPartPtr data_part;
};
}

View File

@ -36,4 +36,16 @@ size_t getLastMark(const MarkRanges & ranges)
return current_task_last_mark;
}
std::string toString(const MarkRanges & ranges)
{
std::string result;
for (const auto & mark_range : ranges)
{
if (!result.empty())
result += ", ";
result += "(" + std::to_string(mark_range.begin) + ", " + std::to_string(mark_range.end) + ")";
}
return result;
}
}

View File

@ -32,4 +32,6 @@ using MarkRanges = std::deque<MarkRange>;
*/
size_t getLastMark(const MarkRanges & ranges);
std::string toString(const MarkRanges & ranges);
}

View File

@ -43,6 +43,7 @@ MergeTreeBaseSelectProcessor::MergeTreeBaseSelectProcessor(
, storage(storage_)
, storage_snapshot(storage_snapshot_)
, prewhere_info(prewhere_info_)
, prewhere_actions(getPrewhereActions(prewhere_info, actions_settings))
, max_block_size_rows(max_block_size_rows_)
, preferred_block_size_bytes(preferred_block_size_bytes_)
, preferred_max_column_in_block_size_bytes(preferred_max_column_in_block_size_bytes_)
@ -72,7 +73,12 @@ MergeTreeBaseSelectProcessor::MergeTreeBaseSelectProcessor(
header_without_virtual_columns.erase(*it);
}
}
}
std::unique_ptr<PrewhereExprInfo> MergeTreeBaseSelectProcessor::getPrewhereActions(PrewhereInfoPtr prewhere_info, const ExpressionActionsSettings & actions_settings)
{
std::unique_ptr<PrewhereExprInfo> prewhere_actions;
if (prewhere_info)
{
prewhere_actions = std::make_unique<PrewhereExprInfo>();
@ -100,6 +106,8 @@ MergeTreeBaseSelectProcessor::MergeTreeBaseSelectProcessor(
prewhere_actions->steps.emplace_back(std::move(prewhere_step));
}
return prewhere_actions;
}
@ -262,45 +270,62 @@ void MergeTreeBaseSelectProcessor::initializeMergeTreeReadersForPart(
void MergeTreeBaseSelectProcessor::initializeRangeReaders(MergeTreeReadTask & current_task)
{
MergeTreeRangeReader* prev_reader = nullptr;
return initializeRangeReadersImpl(
current_task.range_reader, current_task.pre_range_readers, prewhere_info, prewhere_actions.get(),
reader.get(), current_task.data_part->hasLightweightDelete(), reader_settings,
pre_reader_for_step, lightweight_delete_filter_step, non_const_virtual_column_names);
}
void MergeTreeBaseSelectProcessor::initializeRangeReadersImpl(
MergeTreeRangeReader & range_reader, std::deque<MergeTreeRangeReader> & pre_range_readers,
PrewhereInfoPtr prewhere_info, const PrewhereExprInfo * prewhere_actions,
IMergeTreeReader * reader, bool has_lightweight_delete, const MergeTreeReaderSettings & reader_settings,
const std::vector<std::unique_ptr<IMergeTreeReader>> & pre_reader_for_step,
const PrewhereExprStep & lightweight_delete_filter_step, const Names & non_const_virtual_column_names)
{
MergeTreeRangeReader * prev_reader = nullptr;
bool last_reader = false;
size_t pre_readers_shift = 0;
/// Add filtering step with lightweight delete mask
if (reader_settings.apply_deleted_mask && current_task.data_part->hasLightweightDelete())
if (reader_settings.apply_deleted_mask && has_lightweight_delete)
{
current_task.pre_range_readers.push_back(
MergeTreeRangeReader(pre_reader_for_step[0].get(), prev_reader, &lightweight_delete_filter_step, last_reader, non_const_virtual_column_names));
prev_reader = &current_task.pre_range_readers.back();
MergeTreeRangeReader pre_range_reader(pre_reader_for_step[0].get(), prev_reader, &lightweight_delete_filter_step, last_reader, non_const_virtual_column_names);
pre_range_readers.push_back(std::move(pre_range_reader));
prev_reader = &pre_range_readers.back();
pre_readers_shift++;
}
if (prewhere_info)
{
if (prewhere_actions->steps.size() + pre_readers_shift != pre_reader_for_step.size())
throw Exception(ErrorCodes::LOGICAL_ERROR,
"PREWHERE steps count mismatch, actions: {}, readers: {}",
prewhere_actions->steps.size(), pre_reader_for_step.size());
{
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"PREWHERE steps count mismatch, actions: {}, readers: {}",
prewhere_actions->steps.size(), pre_reader_for_step.size());
}
for (size_t i = 0; i < prewhere_actions->steps.size(); ++i)
{
last_reader = reader->getColumns().empty() && (i + 1 == prewhere_actions->steps.size());
current_task.pre_range_readers.push_back(
MergeTreeRangeReader(pre_reader_for_step[i + pre_readers_shift].get(), prev_reader, &prewhere_actions->steps[i], last_reader, non_const_virtual_column_names));
prev_reader = &current_task.pre_range_readers.back();
MergeTreeRangeReader current_reader(pre_reader_for_step[i + pre_readers_shift].get(), prev_reader, &prewhere_actions->steps[i], last_reader, non_const_virtual_column_names);
pre_range_readers.push_back(std::move(current_reader));
prev_reader = &pre_range_readers.back();
}
}
if (!last_reader)
{
current_task.range_reader = MergeTreeRangeReader(reader.get(), prev_reader, nullptr, true, non_const_virtual_column_names);
range_reader = MergeTreeRangeReader(reader, prev_reader, nullptr, true, non_const_virtual_column_names);
}
else
{
/// If all columns are read by pre_range_readers than move last pre_range_reader into range_reader
current_task.range_reader = std::move(current_task.pre_range_readers.back());
current_task.pre_range_readers.pop_back();
range_reader = std::move(pre_range_readers.back());
pre_range_readers.pop_back();
}
}

View File

@ -89,6 +89,20 @@ protected:
static void
injectVirtualColumns(Block & block, size_t row_count, MergeTreeReadTask * task, const DataTypePtr & partition_value_type, const Names & virtual_columns);
static std::unique_ptr<PrewhereExprInfo> getPrewhereActions(PrewhereInfoPtr prewhere_info, const ExpressionActionsSettings & actions_settings);
static void initializeRangeReadersImpl(
MergeTreeRangeReader & range_reader,
std::deque<MergeTreeRangeReader> & pre_range_readers,
PrewhereInfoPtr prewhere_info,
const PrewhereExprInfo * prewhere_actions,
IMergeTreeReader * reader,
bool has_lightweight_delete,
const MergeTreeReaderSettings & reader_settings,
const std::vector<std::unique_ptr<IMergeTreeReader>> & pre_reader_for_step,
const PrewhereExprStep & lightweight_delete_filter_step,
const Names & non_const_virtual_column_names);
/// Sets up data readers for each step of prewhere and where
void initializeMergeTreeReadersForPart(
MergeTreeData::DataPartPtr & data_part,

View File

@ -1,5 +1,6 @@
#include <Storages/MergeTree/MergeTreeBlockReadUtils.h>
#include <Storages/MergeTree/MergeTreeData.h>
#include <Storages/MergeTree/IMergeTreeDataPartInfoForReader.h>
#include <DataTypes/NestedUtils.h>
#include <Core/NamesAndTypes.h>
#include <Common/checkStackSize.h>
@ -28,8 +29,8 @@ namespace
bool injectRequiredColumnsRecursively(
const String & column_name,
const StorageSnapshotPtr & storage_snapshot,
const MergeTreeData::AlterConversions & alter_conversions,
const MergeTreeData::DataPartPtr & part,
const AlterConversions & alter_conversions,
const IMergeTreeDataPartInfoForReader & data_part_info_for_reader,
const GetColumnsOptions & options,
Names & columns,
NameSet & required_columns,
@ -47,7 +48,7 @@ bool injectRequiredColumnsRecursively(
if (alter_conversions.isColumnRenamed(column_name_in_part))
column_name_in_part = alter_conversions.getColumnOldName(column_name_in_part);
auto column_in_part = part->getColumns().tryGetByName(column_name_in_part);
auto column_in_part = data_part_info_for_reader.getColumns().tryGetByName(column_name_in_part);
if (column_in_part
&& (!column_in_storage->isSubcolumn()
@ -78,7 +79,7 @@ bool injectRequiredColumnsRecursively(
bool result = false;
for (const auto & identifier : identifiers)
result |= injectRequiredColumnsRecursively(
identifier, storage_snapshot, alter_conversions, part,
identifier, storage_snapshot, alter_conversions, data_part_info_for_reader,
options, columns, required_columns, injected_columns);
return result;
@ -87,9 +88,8 @@ bool injectRequiredColumnsRecursively(
}
NameSet injectRequiredColumns(
const MergeTreeData & storage,
const IMergeTreeDataPartInfoForReader & data_part_info_for_reader,
const StorageSnapshotPtr & storage_snapshot,
const MergeTreeData::DataPartPtr & part,
bool with_subcolumns,
Names & columns)
{
@ -97,9 +97,9 @@ NameSet injectRequiredColumns(
NameSet injected_columns;
bool have_at_least_one_physical_column = false;
MergeTreeData::AlterConversions alter_conversions;
if (!part->isProjectionPart())
alter_conversions = storage.getAlterConversionsForPart(part);
AlterConversions alter_conversions;
if (!data_part_info_for_reader.isProjectionPart())
alter_conversions = data_part_info_for_reader.getAlterConversions();
auto options = GetColumnsOptions(GetColumnsOptions::AllPhysical)
.withExtendedObjects()
@ -115,7 +115,7 @@ NameSet injectRequiredColumns(
have_at_least_one_physical_column |= injectRequiredColumnsRecursively(
columns[i], storage_snapshot, alter_conversions,
part, options, columns, required_columns, injected_columns);
data_part_info_for_reader, options, columns, required_columns, injected_columns);
}
/** Add a column of the minimum size.
@ -124,7 +124,7 @@ NameSet injectRequiredColumns(
*/
if (!have_at_least_one_physical_column)
{
const auto minimum_size_column_name = part->getColumnNameWithMinimumCompressedSize(with_subcolumns);
const auto minimum_size_column_name = data_part_info_for_reader.getColumnNameWithMinimumCompressedSize(with_subcolumns);
columns.push_back(minimum_size_column_name);
/// correctly report added column
injected_columns.insert(columns.back());
@ -135,13 +135,22 @@ NameSet injectRequiredColumns(
MergeTreeReadTask::MergeTreeReadTask(
const MergeTreeData::DataPartPtr & data_part_, const MarkRanges & mark_ranges_, size_t part_index_in_query_,
const Names & ordered_names_, const NameSet & column_name_set_, const MergeTreeReadTaskColumns & task_columns_,
const MergeTreeData::DataPartPtr & data_part_,
const MarkRanges & mark_ranges_,
size_t part_index_in_query_,
const Names & ordered_names_,
const NameSet & column_name_set_,
const MergeTreeReadTaskColumns & task_columns_,
bool remove_prewhere_column_,
MergeTreeBlockSizePredictorPtr && size_predictor_)
: data_part{data_part_}, mark_ranges{mark_ranges_}, part_index_in_query{part_index_in_query_},
ordered_names{ordered_names_}, column_name_set{column_name_set_}, task_columns{task_columns_},
remove_prewhere_column{remove_prewhere_column_}, size_predictor{std::move(size_predictor_)}
: data_part{data_part_}
, mark_ranges{mark_ranges_}
, part_index_in_query{part_index_in_query_}
, ordered_names{ordered_names_}
, column_name_set{column_name_set_}
, task_columns{task_columns_}
, remove_prewhere_column{remove_prewhere_column_}
, size_predictor{std::move(size_predictor_)}
{
}
@ -270,9 +279,8 @@ void MergeTreeBlockSizePredictor::update(const Block & sample_block, const Colum
MergeTreeReadTaskColumns getReadTaskColumns(
const MergeTreeData & storage,
const IMergeTreeDataPartInfoForReader & data_part_info_for_reader,
const StorageSnapshotPtr & storage_snapshot,
const MergeTreeData::DataPartPtr & data_part,
const Names & required_columns,
const Names & system_columns,
const PrewhereInfoPtr & prewhere_info,
@ -284,13 +292,13 @@ MergeTreeReadTaskColumns getReadTaskColumns(
/// Read system columns such as lightweight delete mask "_row_exists" if it is persisted in the part
for (const auto & name : system_columns)
{
if (data_part->getColumns().contains(name))
if (data_part_info_for_reader.getColumns().contains(name))
column_names.push_back(name);
}
/// inject columns required for defaults evaluation
injectRequiredColumns(
storage, storage_snapshot, data_part, with_subcolumns, column_names);
data_part_info_for_reader, storage_snapshot, with_subcolumns, column_names);
MergeTreeReadTaskColumns result;
auto options = GetColumnsOptions(GetColumnsOptions::All)
@ -316,7 +324,7 @@ MergeTreeReadTaskColumns getReadTaskColumns(
Names all_pre_column_names = prewhere_info->prewhere_actions->getRequiredColumnsNames();
const auto injected_pre_columns = injectRequiredColumns(
storage, storage_snapshot, data_part, with_subcolumns, all_pre_column_names);
data_part_info_for_reader, storage_snapshot, with_subcolumns, all_pre_column_names);
for (const auto & name : all_pre_column_names)
{

View File

@ -12,6 +12,7 @@ namespace DB
class MergeTreeData;
struct MergeTreeReadTask;
struct MergeTreeBlockSizePredictor;
class IMergeTreeDataPartInfoForReader;
using MergeTreeReadTaskPtr = std::unique_ptr<MergeTreeReadTask>;
using MergeTreeBlockSizePredictorPtr = std::shared_ptr<MergeTreeBlockSizePredictor>;
@ -23,9 +24,8 @@ using MergeTreeBlockSizePredictorPtr = std::shared_ptr<MergeTreeBlockSizePredict
* Adds them to the `columns`.
*/
NameSet injectRequiredColumns(
const MergeTreeData & storage,
const IMergeTreeDataPartInfoForReader & data_part_info_for_reader,
const StorageSnapshotPtr & storage_snapshot,
const MergeTreeData::DataPartPtr & part,
bool with_subcolumns,
Names & columns);
@ -68,16 +68,19 @@ struct MergeTreeReadTask
bool isFinished() const { return mark_ranges.empty() && range_reader.isCurrentRangeFinished(); }
MergeTreeReadTask(
const MergeTreeData::DataPartPtr & data_part_, const MarkRanges & mark_ranges_, size_t part_index_in_query_,
const Names & ordered_names_, const NameSet & column_name_set_, const MergeTreeReadTaskColumns & task_columns_,
const MergeTreeData::DataPartPtr & data_part_,
const MarkRanges & mark_ranges_,
size_t part_index_in_query_,
const Names & ordered_names_,
const NameSet & column_name_set_,
const MergeTreeReadTaskColumns & task_columns_,
bool remove_prewhere_column_,
MergeTreeBlockSizePredictorPtr && size_predictor_);
};
MergeTreeReadTaskColumns getReadTaskColumns(
const MergeTreeData & storage,
const IMergeTreeDataPartInfoForReader & data_part_info_for_reader,
const StorageSnapshotPtr & storage_snapshot,
const MergeTreeData::DataPartPtr & data_part,
const Names & required_columns,
const Names & system_columns,
const PrewhereInfoPtr & prewhere_info,

View File

@ -6671,7 +6671,7 @@ bool MergeTreeData::canUsePolymorphicParts(const MergeTreeSettings & settings, S
return true;
}
MergeTreeData::AlterConversions MergeTreeData::getAlterConversionsForPart(const MergeTreeDataPartPtr part) const
AlterConversions MergeTreeData::getAlterConversionsForPart(const MergeTreeDataPartPtr part) const
{
MutationCommands commands = getFirstAlterMutationCommandsForPart(part);

View File

@ -24,6 +24,7 @@
#include <Storages/MergeTree/ZeroCopyLock.h>
#include <Storages/MergeTree/TemporaryParts.h>
#include <Storages/IndicesDescription.h>
#include <Storages/MergeTree/AlterConversions.h>
#include <Storages/DataDestinationType.h>
#include <Storages/extractKeyExpressionList.h>
#include <Storages/PartitionCommands.h>
@ -167,20 +168,6 @@ public:
STRONG_TYPEDEF(String, PartitionID)
/// Alter conversions which should be applied on-fly for part. Build from of
/// the most recent mutation commands for part. Now we have only rename_map
/// here (from ALTER_RENAME) command, because for all other type of alters
/// we can deduce conversions for part from difference between
/// part->getColumns() and storage->getColumns().
struct AlterConversions
{
/// Rename map new_name -> old_name
std::unordered_map<String, String> rename_map;
bool isColumnRenamed(const String & new_name) const { return rename_map.count(new_name) > 0; }
String getColumnOldName(const String & new_name) const { return rename_map.at(new_name); }
};
struct LessDataPart
{
using is_transparent = void;

View File

@ -2,6 +2,7 @@
#include <DataTypes/NestedUtils.h>
#include <Storages/MergeTree/MergeTreeReaderCompact.h>
#include <Storages/MergeTree/MergeTreeDataPartWriterCompact.h>
#include <Storages/MergeTree/LoadedMergeTreeDataPartInfoForReader.h>
namespace DB
@ -45,9 +46,9 @@ IMergeTreeDataPart::MergeTreeReaderPtr MergeTreeDataPartCompact::getReader(
const ValueSizeMap & avg_value_size_hints,
const ReadBufferFromFileBase::ProfileCallback & profile_callback) const
{
auto ptr = std::static_pointer_cast<const MergeTreeDataPartCompact>(shared_from_this());
auto read_info = std::make_shared<LoadedMergeTreeDataPartInfoForReader>(shared_from_this());
return std::make_unique<MergeTreeReaderCompact>(
ptr, columns_to_read, metadata_snapshot, uncompressed_cache,
read_info, columns_to_read, metadata_snapshot, uncompressed_cache,
mark_cache, mark_ranges, reader_settings,
avg_value_size_hints, profile_callback);
}
@ -90,39 +91,44 @@ void MergeTreeDataPartCompact::calculateEachColumnSizes(ColumnSizeByName & /*eac
total_size.marks += mrk_checksum->second.file_size;
}
void MergeTreeDataPartCompact::loadIndexGranularity()
void MergeTreeDataPartCompact::loadIndexGranularityImpl(
MergeTreeIndexGranularity & index_granularity_, const MergeTreeIndexGranularityInfo & index_granularity_info_,
const NamesAndTypesList & columns_, const DataPartStoragePtr & data_part_storage_)
{
//String full_path = getRelativePath();
if (columns.empty())
throw Exception("No columns in part " + name, ErrorCodes::NO_FILE_IN_DATA_PART);
if (!index_granularity_info.is_adaptive)
if (!index_granularity_info_.is_adaptive)
throw Exception("MergeTreeDataPartCompact cannot be created with non-adaptive granulary.", ErrorCodes::NOT_IMPLEMENTED);
auto marks_file_path = index_granularity_info.getMarksFilePath("data");
if (!data_part_storage->exists(marks_file_path))
auto marks_file_path = index_granularity_info_.getMarksFilePath("data");
if (!data_part_storage_->exists(marks_file_path))
throw Exception(
ErrorCodes::NO_FILE_IN_DATA_PART,
"Marks file '{}' doesn't exist",
std::string(fs::path(data_part_storage->getFullPath()) / marks_file_path));
std::string(fs::path(data_part_storage_->getFullPath()) / marks_file_path));
size_t marks_file_size = data_part_storage->getFileSize(marks_file_path);
size_t marks_file_size = data_part_storage_->getFileSize(marks_file_path);
auto buffer = data_part_storage->readFile(marks_file_path, ReadSettings().adjustBufferSize(marks_file_size), marks_file_size, std::nullopt);
auto buffer = data_part_storage_->readFile(marks_file_path, ReadSettings().adjustBufferSize(marks_file_size), marks_file_size, std::nullopt);
while (!buffer->eof())
{
/// Skip offsets for columns
buffer->seek(columns.size() * sizeof(MarkInCompressedFile), SEEK_CUR);
buffer->seek(columns_.size() * sizeof(MarkInCompressedFile), SEEK_CUR);
size_t granularity;
readIntBinary(granularity, *buffer);
index_granularity.appendMark(granularity);
index_granularity_.appendMark(granularity);
}
if (index_granularity.getMarksCount() * index_granularity_info.getMarkSizeInBytes(columns.size()) != marks_file_size)
if (index_granularity_.getMarksCount() * index_granularity_info_.getMarkSizeInBytes(columns_.size()) != marks_file_size)
throw Exception("Cannot read all marks from file " + marks_file_path, ErrorCodes::CANNOT_READ_ALL_DATA);
index_granularity.setInitialized();
index_granularity_.setInitialized();
}
void MergeTreeDataPartCompact::loadIndexGranularity()
{
if (columns.empty())
throw Exception("No columns in part " + name, ErrorCodes::NO_FILE_IN_DATA_PART);
loadIndexGranularityImpl(index_granularity, index_granularity_info, columns, data_part_storage);
}
bool MergeTreeDataPartCompact::hasColumnFiles(const NameAndTypePair & column) const

View File

@ -65,6 +65,11 @@ public:
~MergeTreeDataPartCompact() override;
protected:
static void loadIndexGranularityImpl(
MergeTreeIndexGranularity & index_granularity_, const MergeTreeIndexGranularityInfo & index_granularity_info_,
const NamesAndTypesList & columns_, const DataPartStoragePtr & data_part_storage_);
private:
void checkConsistency(bool require_part_metadata) const override;

View File

@ -3,6 +3,7 @@
#include <Storages/MergeTree/MergedBlockOutputStream.h>
#include <Storages/MergeTree/MergeTreeDataPartWriterInMemory.h>
#include <Storages/MergeTree/IMergeTreeReader.h>
#include <Storages/MergeTree/LoadedMergeTreeDataPartInfoForReader.h>
#include <DataTypes/NestedUtils.h>
#include <Interpreters/Context.h>
#include <Poco/Logger.h>
@ -48,9 +49,10 @@ IMergeTreeDataPart::MergeTreeReaderPtr MergeTreeDataPartInMemory::getReader(
const ValueSizeMap & /* avg_value_size_hints */,
const ReadBufferFromFileBase::ProfileCallback & /* profile_callback */) const
{
auto read_info = std::make_shared<LoadedMergeTreeDataPartInfoForReader>(shared_from_this());
auto ptr = std::static_pointer_cast<const MergeTreeDataPartInMemory>(shared_from_this());
return std::make_unique<MergeTreeReaderInMemory>(
ptr, columns_to_read, metadata_snapshot, mark_ranges, reader_settings);
read_info, ptr, columns_to_read, metadata_snapshot, mark_ranges, reader_settings);
}
IMergeTreeDataPart::MergeTreeWriterPtr MergeTreeDataPartInMemory::getWriter(

View File

@ -2,6 +2,7 @@
#include <Storages/MergeTree/MergeTreeReaderWide.h>
#include <Storages/MergeTree/MergeTreeDataPartWriterWide.h>
#include <Storages/MergeTree/IMergeTreeDataPartWriter.h>
#include <Storages/MergeTree/LoadedMergeTreeDataPartInfoForReader.h>
#include <DataTypes/NestedUtils.h>
#include <Core/NamesAndTypes.h>
@ -47,9 +48,9 @@ IMergeTreeDataPart::MergeTreeReaderPtr MergeTreeDataPartWide::getReader(
const ValueSizeMap & avg_value_size_hints,
const ReadBufferFromFileBase::ProfileCallback & profile_callback) const
{
auto ptr = std::static_pointer_cast<const MergeTreeDataPartWide>(shared_from_this());
auto read_info = std::make_shared<LoadedMergeTreeDataPartInfoForReader>(shared_from_this());
return std::make_unique<MergeTreeReaderWide>(
ptr, columns_to_read,
read_info, columns_to_read,
metadata_snapshot, uncompressed_cache,
mark_cache, mark_ranges, reader_settings,
avg_value_size_hints, profile_callback);
@ -103,46 +104,52 @@ ColumnSize MergeTreeDataPartWide::getColumnSizeImpl(
return size;
}
void MergeTreeDataPartWide::loadIndexGranularity()
void MergeTreeDataPartWide::loadIndexGranularityImpl(
MergeTreeIndexGranularity & index_granularity_, MergeTreeIndexGranularityInfo & index_granularity_info_,
const DataPartStoragePtr & data_part_storage_, const std::string & any_column_file_name)
{
index_granularity_info.changeGranularityIfRequired(data_part_storage);
if (columns.empty())
throw Exception("No columns in part " + name, ErrorCodes::NO_FILE_IN_DATA_PART);
index_granularity_info_.changeGranularityIfRequired(data_part_storage_);
/// We can use any column, it doesn't matter
std::string marks_file_path = index_granularity_info.getMarksFilePath(getFileNameForColumn(columns.front()));
if (!data_part_storage->exists(marks_file_path))
std::string marks_file_path = index_granularity_info_.getMarksFilePath(any_column_file_name);
if (!data_part_storage_->exists(marks_file_path))
throw Exception(
ErrorCodes::NO_FILE_IN_DATA_PART, "Marks file '{}' doesn't exist",
std::string(fs::path(data_part_storage->getFullPath()) / marks_file_path));
std::string(fs::path(data_part_storage_->getFullPath()) / marks_file_path));
size_t marks_file_size = data_part_storage->getFileSize(marks_file_path);
size_t marks_file_size = data_part_storage_->getFileSize(marks_file_path);
if (!index_granularity_info.is_adaptive)
if (!index_granularity_info_.is_adaptive)
{
size_t marks_count = marks_file_size / index_granularity_info.getMarkSizeInBytes();
index_granularity.resizeWithFixedGranularity(marks_count, index_granularity_info.fixed_index_granularity); /// all the same
size_t marks_count = marks_file_size / index_granularity_info_.getMarkSizeInBytes();
index_granularity_.resizeWithFixedGranularity(marks_count, index_granularity_info_.fixed_index_granularity); /// all the same
}
else
{
auto buffer = data_part_storage->readFile(marks_file_path, ReadSettings().adjustBufferSize(marks_file_size), marks_file_size, std::nullopt);
auto buffer = data_part_storage_->readFile(marks_file_path, ReadSettings().adjustBufferSize(marks_file_size), marks_file_size, std::nullopt);
while (!buffer->eof())
{
buffer->seek(sizeof(size_t) * 2, SEEK_CUR); /// skip offset_in_compressed file and offset_in_decompressed_block
size_t granularity;
readIntBinary(granularity, *buffer);
index_granularity.appendMark(granularity);
index_granularity_.appendMark(granularity);
}
if (index_granularity.getMarksCount() * index_granularity_info.getMarkSizeInBytes() != marks_file_size)
if (index_granularity_.getMarksCount() * index_granularity_info_.getMarkSizeInBytes() != marks_file_size)
throw Exception(
ErrorCodes::CANNOT_READ_ALL_DATA, "Cannot read all marks from file {}",
std::string(fs::path(data_part_storage->getFullPath()) / marks_file_path));
std::string(fs::path(data_part_storage_->getFullPath()) / marks_file_path));
}
index_granularity.setInitialized();
index_granularity_.setInitialized();
}
void MergeTreeDataPartWide::loadIndexGranularity()
{
if (columns.empty())
throw Exception("No columns in part " + name, ErrorCodes::NO_FILE_IN_DATA_PART);
loadIndexGranularityImpl(index_granularity, index_granularity_info, data_part_storage, getFileNameForColumn(columns.front()));
}
bool MergeTreeDataPartWide::isStoredOnRemoteDisk() const

View File

@ -61,6 +61,11 @@ public:
bool hasColumnFiles(const NameAndTypePair & column) const override;
protected:
static void loadIndexGranularityImpl(
MergeTreeIndexGranularity & index_granularity_, MergeTreeIndexGranularityInfo & index_granularity_info_,
const DataPartStoragePtr & data_part_storage_, const std::string & any_column_file_name);
private:
void checkConsistency(bool require_part_metadata) const override;

View File

@ -29,6 +29,8 @@ public:
MergeTreeIndexGranularityInfo(const MergeTreeData & storage, MergeTreeDataPartType type_);
MergeTreeIndexGranularityInfo(MergeTreeDataPartType type_, bool is_adaptive_, size_t index_granularity_, size_t index_granularity_bytes_);
void changeGranularityIfRequired(const DataPartStoragePtr & data_part_storage);
String getMarksFilePath(const String & path_prefix) const

View File

@ -83,7 +83,7 @@ MergeTreeRangeReader::DelayedStream::DelayedStream(
: current_mark(from_mark), current_offset(0), num_delayed_rows(0)
, current_task_last_mark(current_task_last_mark_)
, merge_tree_reader(merge_tree_reader_)
, index_granularity(&(merge_tree_reader->data_part->index_granularity))
, index_granularity(&(merge_tree_reader->data_part_info_for_read->getIndexGranularity()))
, continue_reading(false), is_finished(false)
{
}
@ -181,7 +181,7 @@ MergeTreeRangeReader::Stream::Stream(
: current_mark(from_mark), offset_after_current_mark(0)
, last_mark(to_mark)
, merge_tree_reader(merge_tree_reader_)
, index_granularity(&(merge_tree_reader->data_part->index_granularity))
, index_granularity(&(merge_tree_reader->data_part_info_for_read->getIndexGranularity()))
, current_mark_index_granularity(index_granularity->getMarkRows(from_mark))
, stream(from_mark, current_task_last_mark, merge_tree_reader)
{
@ -652,7 +652,7 @@ MergeTreeRangeReader::MergeTreeRangeReader(
bool last_reader_in_chain_,
const Names & non_const_virtual_column_names_)
: merge_tree_reader(merge_tree_reader_)
, index_granularity(&(merge_tree_reader->data_part->index_granularity))
, index_granularity(&(merge_tree_reader->data_part_info_for_read->getIndexGranularity()))
, prev_reader(prev_reader_)
, prewhere_info(prewhere_info_)
, last_reader_in_chain(last_reader_in_chain_)
@ -946,7 +946,8 @@ MergeTreeRangeReader::ReadResult MergeTreeRangeReader::startReadingChain(size_t
result.addRows(stream.finalize(result.columns));
/// Last granule may be incomplete.
result.adjustLastGranule();
if (!result.rowsPerGranule().empty())
result.adjustLastGranule();
for (const auto & column_name : non_const_virtual_column_names)
{

View File

@ -1,5 +1,6 @@
#include <Storages/MergeTree/MergeTreeReadPool.h>
#include <Storages/MergeTree/MergeTreeBaseSelectProcessor.h>
#include <Storages/MergeTree/LoadedMergeTreeDataPartInfoForReader.h>
#include <Common/formatReadable.h>
#include <base/range.h>
@ -22,7 +23,6 @@ MergeTreeReadPool::MergeTreeReadPool(
size_t sum_marks_,
size_t min_marks_for_concurrent_read_,
RangesInDataParts && parts_,
const MergeTreeData & data_,
const StorageSnapshotPtr & storage_snapshot_,
const PrewhereInfoPtr & prewhere_info_,
const Names & column_names_,
@ -32,7 +32,6 @@ MergeTreeReadPool::MergeTreeReadPool(
bool do_not_steal_tasks_)
: backoff_settings{backoff_settings_}
, backoff_state{threads_}
, data{data_}
, storage_snapshot{storage_snapshot_}
, column_names{column_names_}
, virtual_column_names{virtual_column_names_}
@ -214,7 +213,7 @@ std::vector<size_t> MergeTreeReadPool::fillPerPartInfo(const RangesInDataParts &
per_part_sum_marks.push_back(sum_marks);
auto task_columns = getReadTaskColumns(
data, storage_snapshot, part.data_part,
LoadedMergeTreeDataPartInfoForReader(part.data_part), storage_snapshot,
column_names, virtual_column_names, prewhere_info, /*with_subcolumns=*/ true);
auto size_predictor = !predict_block_size_bytes ? nullptr

View File

@ -70,11 +70,16 @@ private:
public:
MergeTreeReadPool(
size_t threads_, size_t sum_marks_, size_t min_marks_for_concurrent_read_,
RangesInDataParts && parts_, const MergeTreeData & data_, const StorageSnapshotPtr & storage_snapshot_,
size_t threads_,
size_t sum_marks_,
size_t min_marks_for_concurrent_read_,
RangesInDataParts && parts_,
const StorageSnapshotPtr & storage_snapshot_,
const PrewhereInfoPtr & prewhere_info_,
const Names & column_names_, const Names & virtual_column_names_,
const BackoffSettings & backoff_settings_, size_t preferred_block_size_bytes_,
const Names & column_names_,
const Names & virtual_column_names_,
const BackoffSettings & backoff_settings_,
size_t preferred_block_size_bytes_,
bool do_not_steal_tasks_ = false);
MergeTreeReadTaskPtr getTask(size_t min_marks_to_read, size_t thread, const Names & ordered_names);
@ -94,7 +99,6 @@ private:
size_t threads, size_t sum_marks, std::vector<size_t> per_part_sum_marks,
const RangesInDataParts & parts, size_t min_marks_for_concurrent_read);
const MergeTreeData & data;
StorageSnapshotPtr storage_snapshot;
const Names column_names;
const Names virtual_column_names;

View File

@ -15,7 +15,7 @@ namespace ErrorCodes
MergeTreeReaderCompact::MergeTreeReaderCompact(
DataPartCompactPtr data_part_,
MergeTreeDataPartInfoForReaderPtr data_part_info_for_read_,
NamesAndTypesList columns_,
const StorageMetadataPtr & metadata_snapshot_,
UncompressedCache * uncompressed_cache_,
@ -26,7 +26,7 @@ MergeTreeReaderCompact::MergeTreeReaderCompact(
const ReadBufferFromFileBase::ProfileCallback & profile_callback_,
clockid_t clock_type_)
: IMergeTreeReader(
data_part_,
data_part_info_for_read_,
columns_,
metadata_snapshot_,
uncompressed_cache_,
@ -35,14 +35,14 @@ MergeTreeReaderCompact::MergeTreeReaderCompact(
settings_,
avg_value_size_hints_)
, marks_loader(
data_part->data_part_storage,
data_part_info_for_read_->getDataPartStorage(),
mark_cache,
data_part->index_granularity_info.getMarksFilePath(MergeTreeDataPartCompact::DATA_FILE_NAME),
data_part->getMarksCount(),
data_part->index_granularity_info,
data_part_info_for_read_->getIndexGranularityInfo().getMarksFilePath(MergeTreeDataPartCompact::DATA_FILE_NAME),
data_part_info_for_read_->getMarksCount(),
data_part_info_for_read_->getIndexGranularityInfo(),
settings.save_marks_in_cache,
settings.read_settings,
data_part->getColumns().size())
data_part_info_for_read_->getColumns().size())
{
try
{
@ -64,7 +64,7 @@ MergeTreeReaderCompact::MergeTreeReaderCompact(
continue;
}
auto position = data_part->getColumnPosition(column_to_read.getNameInStorage());
auto position = data_part_info_for_read->getColumnPosition(column_to_read.getNameInStorage());
if (!position && typeid_cast<const DataTypeArray *>(column_to_read.type.get()))
{
/// If array of Nested column is missing in part,
@ -77,7 +77,7 @@ MergeTreeReaderCompact::MergeTreeReaderCompact(
}
/// Do not use max_read_buffer_size, but try to lower buffer size with maximal size of granule to avoid reading much data.
auto buffer_size = getReadBufferSize(data_part, marks_loader, column_positions, all_mark_ranges);
auto buffer_size = getReadBufferSize(*data_part_info_for_read, marks_loader, column_positions, all_mark_ranges);
if (buffer_size)
settings.read_settings = settings.read_settings.adjustBufferSize(buffer_size);
@ -88,10 +88,10 @@ MergeTreeReaderCompact::MergeTreeReaderCompact(
if (uncompressed_cache)
{
auto buffer = std::make_unique<CachedCompressedReadBuffer>(
std::string(fs::path(data_part->data_part_storage->getFullPath()) / path),
std::string(fs::path(data_part_info_for_read->getDataPartStorage()->getFullPath()) / path),
[this, path]()
{
return data_part->data_part_storage->readFile(
return data_part_info_for_read->getDataPartStorage()->readFile(
path,
settings.read_settings,
std::nullopt, std::nullopt);
@ -113,7 +113,7 @@ MergeTreeReaderCompact::MergeTreeReaderCompact(
{
auto buffer =
std::make_unique<CompressedReadBufferFromFile>(
data_part->data_part_storage->readFile(
data_part_info_for_read->getDataPartStorage()->readFile(
path,
settings.read_settings,
std::nullopt, std::nullopt),
@ -132,7 +132,7 @@ MergeTreeReaderCompact::MergeTreeReaderCompact(
}
catch (...)
{
storage.reportBrokenPart(data_part);
data_part_info_for_read->reportBroken();
throw;
}
}
@ -156,7 +156,7 @@ size_t MergeTreeReaderCompact::readRows(
while (read_rows < max_rows_to_read)
{
size_t rows_to_read = data_part->index_granularity.getMarkRows(from_mark);
size_t rows_to_read = data_part_info_for_read->getIndexGranularity().getMarkRows(from_mark);
for (size_t pos = 0; pos < num_columns; ++pos)
{
@ -179,7 +179,7 @@ size_t MergeTreeReaderCompact::readRows(
catch (Exception & e)
{
if (e.code() != ErrorCodes::MEMORY_LIMIT_EXCEEDED)
storage.reportBrokenPart(data_part);
data_part_info_for_read->reportBroken();
/// Better diagnostics.
e.addMessage("(while reading column " + columns_to_read[pos].name + ")");
@ -187,7 +187,7 @@ size_t MergeTreeReaderCompact::readRows(
}
catch (...)
{
storage.reportBrokenPart(data_part);
data_part_info_for_read->reportBroken();
throw;
}
}
@ -279,7 +279,7 @@ void MergeTreeReaderCompact::seekToMark(size_t row_index, size_t column_index)
void MergeTreeReaderCompact::adjustUpperBound(size_t last_mark)
{
size_t right_offset = 0;
if (last_mark < data_part->getMarksCount()) /// Otherwise read until the end of file
if (last_mark < data_part_info_for_read->getMarksCount()) /// Otherwise read until the end of file
right_offset = marks_loader.getMark(last_mark).offset_in_compressed_file;
if (right_offset == 0)
@ -307,7 +307,7 @@ bool MergeTreeReaderCompact::isContinuousReading(size_t mark, size_t column_posi
return false;
const auto & [last_mark, last_column] = *last_read_granule;
return (mark == last_mark && column_position == last_column + 1)
|| (mark == last_mark + 1 && column_position == 0 && last_column == data_part->getColumns().size() - 1);
|| (mark == last_mark + 1 && column_position == 0 && last_column == data_part_info_for_read->getColumns().size() - 1);
}
namespace
@ -359,16 +359,16 @@ private:
}
size_t MergeTreeReaderCompact::getReadBufferSize(
const DataPartPtr & part,
const IMergeTreeDataPartInfoForReader & data_part_info_for_reader,
MergeTreeMarksLoader & marks_loader,
const ColumnPositions & column_positions,
const MarkRanges & mark_ranges)
{
size_t buffer_size = 0;
size_t columns_num = column_positions.size();
size_t file_size = part->getFileSizeOrZero(MergeTreeDataPartCompact::DATA_FILE_NAME_WITH_EXTENSION);
size_t file_size = data_part_info_for_reader.getFileSizeOrZero(MergeTreeDataPartCompact::DATA_FILE_NAME_WITH_EXTENSION);
MarksCounter counter(part->getMarksCount(), part->getColumns().size());
MarksCounter counter(data_part_info_for_reader.getMarksCount(), data_part_info_for_reader.getColumns().size());
for (const auto & mark_range : mark_ranges)
{

View File

@ -19,7 +19,7 @@ class MergeTreeReaderCompact : public IMergeTreeReader
{
public:
MergeTreeReaderCompact(
DataPartCompactPtr data_part_,
MergeTreeDataPartInfoForReaderPtr data_part_info_for_read_,
NamesAndTypesList columns_,
const StorageMetadataPtr & metadata_snapshot_,
UncompressedCache * uncompressed_cache_,
@ -67,7 +67,7 @@ private:
/// Returns maximal value of granule size in compressed file from @mark_ranges.
/// This value is used as size of read buffer.
static size_t getReadBufferSize(
const DataPartPtr & part,
const IMergeTreeDataPartInfoForReader & data_part_info_for_read_,
MergeTreeMarksLoader & marks_loader,
const ColumnPositions & column_positions,
const MarkRanges & mark_ranges);

View File

@ -16,13 +16,14 @@ namespace ErrorCodes
MergeTreeReaderInMemory::MergeTreeReaderInMemory(
MergeTreeDataPartInfoForReaderPtr data_part_info_for_read_,
DataPartInMemoryPtr data_part_,
NamesAndTypesList columns_,
const StorageMetadataPtr & metadata_snapshot_,
MarkRanges mark_ranges_,
MergeTreeReaderSettings settings_)
: IMergeTreeReader(
data_part_,
data_part_info_for_read_,
columns_,
metadata_snapshot_,
nullptr,
@ -48,7 +49,7 @@ size_t MergeTreeReaderInMemory::readRows(
if (!continue_reading)
total_rows_read = 0;
size_t total_marks = data_part->index_granularity.getMarksCount();
size_t total_marks = data_part_info_for_read->getIndexGranularity().getMarksCount();
if (from_mark >= total_marks)
throw Exception("Mark " + toString(from_mark) + " is out of bound. Max mark: "
+ toString(total_marks), ErrorCodes::ARGUMENT_OUT_OF_BOUND);

View File

@ -15,6 +15,7 @@ class MergeTreeReaderInMemory : public IMergeTreeReader
{
public:
MergeTreeReaderInMemory(
MergeTreeDataPartInfoForReaderPtr data_part_info_for_read_,
DataPartInMemoryPtr data_part_,
NamesAndTypesList columns_,
const StorageMetadataPtr & metadata_snapshot_,

View File

@ -26,7 +26,7 @@ namespace ErrorCodes
}
MergeTreeReaderWide::MergeTreeReaderWide(
DataPartWidePtr data_part_,
MergeTreeDataPartInfoForReaderPtr data_part_info_,
NamesAndTypesList columns_,
const StorageMetadataPtr & metadata_snapshot_,
UncompressedCache * uncompressed_cache_,
@ -37,7 +37,7 @@ MergeTreeReaderWide::MergeTreeReaderWide(
const ReadBufferFromFileBase::ProfileCallback & profile_callback_,
clockid_t clock_type_)
: IMergeTreeReader(
data_part_,
data_part_info_,
columns_,
metadata_snapshot_,
uncompressed_cache_,
@ -53,7 +53,7 @@ MergeTreeReaderWide::MergeTreeReaderWide(
}
catch (...)
{
storage.reportBrokenPart(data_part);
data_part_info_for_read->reportBroken();
throw;
}
}
@ -73,7 +73,7 @@ size_t MergeTreeReaderWide::readRows(
std::unordered_map<String, ISerialization::SubstreamsCache> caches;
std::unordered_set<std::string> prefetched_streams;
if (data_part->data_part_storage->isStoredOnRemoteDisk() ? settings.read_settings.remote_fs_prefetch : settings.read_settings.local_fs_prefetch)
if (data_part_info_for_read->getDataPartStorage()->isStoredOnRemoteDisk() ? settings.read_settings.remote_fs_prefetch : settings.read_settings.local_fs_prefetch)
{
/// Request reading of data in advance,
/// so if reading can be asynchronous, it will also be performed in parallel for all columns.
@ -136,17 +136,17 @@ size_t MergeTreeReaderWide::readRows(
catch (Exception & e)
{
if (e.code() != ErrorCodes::MEMORY_LIMIT_EXCEEDED)
storage.reportBrokenPart(data_part);
data_part_info_for_read->reportBroken();
/// Better diagnostics.
e.addMessage("(while reading from part " + data_part->data_part_storage->getFullPath() + " "
e.addMessage("(while reading from part " + data_part_info_for_read->getDataPartStorage()->getFullPath() + " "
"from mark " + toString(from_mark) + " "
"with max_rows_to_read = " + toString(max_rows_to_read) + ")");
throw;
}
catch (...)
{
storage.reportBrokenPart(data_part);
data_part_info_for_read->reportBroken();
throw;
}
@ -167,7 +167,7 @@ void MergeTreeReaderWide::addStreams(
if (streams.contains(stream_name))
return;
bool data_file_exists = data_part->checksums.files.contains(stream_name + DATA_FILE_EXTENSION);
bool data_file_exists = data_part_info_for_read->getChecksums().files.contains(stream_name + DATA_FILE_EXTENSION);
/** If data file is missing then we will not try to open it.
* It is necessary since it allows to add new column to structure of the table without creating new files for old parts.
@ -178,10 +178,10 @@ void MergeTreeReaderWide::addStreams(
bool is_lc_dict = substream_path.size() > 1 && substream_path[substream_path.size() - 2].type == ISerialization::Substream::Type::DictionaryKeys;
streams.emplace(stream_name, std::make_unique<MergeTreeReaderStream>(
data_part->data_part_storage, stream_name, DATA_FILE_EXTENSION,
data_part->getMarksCount(), all_mark_ranges, settings, mark_cache,
uncompressed_cache, data_part->getFileSizeOrZero(stream_name + DATA_FILE_EXTENSION),
&data_part->index_granularity_info,
data_part_info_for_read->getDataPartStorage(), stream_name, DATA_FILE_EXTENSION,
data_part_info_for_read->getMarksCount(), all_mark_ranges, settings, mark_cache,
uncompressed_cache, data_part_info_for_read->getFileSizeOrZero(stream_name + DATA_FILE_EXTENSION),
&data_part_info_for_read->getIndexGranularityInfo(),
profile_callback, clock_type, is_lc_dict));
};

View File

@ -15,7 +15,7 @@ class MergeTreeReaderWide : public IMergeTreeReader
{
public:
MergeTreeReaderWide(
DataPartWidePtr data_part_,
MergeTreeDataPartInfoForReaderPtr data_part_info_for_read_,
NamesAndTypesList columns_,
const StorageMetadataPtr & metadata_snapshot_,
UncompressedCache * uncompressed_cache_,

View File

@ -1,6 +1,7 @@
#include <Storages/MergeTree/MergeTreeSelectProcessor.h>
#include <Storages/MergeTree/MergeTreeBaseSelectProcessor.h>
#include <Storages/MergeTree/IMergeTreeReader.h>
#include <Storages/MergeTree/LoadedMergeTreeDataPartInfoForReader.h>
#include <Interpreters/Context.h>
@ -51,7 +52,7 @@ MergeTreeSelectProcessor::MergeTreeSelectProcessor(
void MergeTreeSelectProcessor::initializeReaders()
{
task_columns = getReadTaskColumns(
storage, storage_snapshot, data_part,
LoadedMergeTreeDataPartInfoForReader(data_part), storage_snapshot,
required_columns, virt_column_names, prewhere_info, /*with_subcolumns=*/ true);
/// Will be used to distinguish between PREWHERE and WHERE columns when applying filter

View File

@ -1,5 +1,6 @@
#include <Storages/MergeTree/MergeTreeSequentialSource.h>
#include <Storages/MergeTree/MergeTreeBlockReadUtils.h>
#include <Storages/MergeTree/LoadedMergeTreeDataPartInfoForReader.h>
#include <Processors/Transforms/FilterTransform.h>
#include <QueryPipeline/Pipe.h>
#include <Interpreters/Context.h>
@ -102,7 +103,7 @@ MergeTreeSequentialSource::MergeTreeSequentialSource(
addTotalRowsApprox(data_part->rows_count);
/// Add columns because we don't want to read empty blocks
injectRequiredColumns(storage, storage_snapshot, data_part, /*with_subcolumns=*/ false, columns_to_read);
injectRequiredColumns(LoadedMergeTreeDataPartInfoForReader(data_part), storage_snapshot, /*with_subcolumns=*/ false, columns_to_read);
NamesAndTypesList columns_for_reader;
if (take_column_types_from_storage)

View File

@ -1,6 +1,7 @@
#include "StorageSystemRemoteDataPaths.h"
#include <DataTypes/DataTypeString.h>
#include <DataTypes/DataTypeArray.h>
#include <DataTypes/DataTypesNumber.h>
#include <Interpreters/Cache/FileCache.h>
#include <Interpreters/Cache/FileCacheFactory.h>
#include <Columns/ColumnString.h>
@ -23,6 +24,8 @@ StorageSystemRemoteDataPaths::StorageSystemRemoteDataPaths(const StorageID & tab
{"cache_base_path", std::make_shared<DataTypeString>()},
{"local_path", std::make_shared<DataTypeString>()},
{"remote_path", std::make_shared<DataTypeString>()},
{"size", std::make_shared<DataTypeUInt64>()},
{"common_prefix_for_blobs", std::make_shared<DataTypeString>()},
{"cache_paths", std::make_shared<DataTypeArray>(std::make_shared<DataTypeString>())},
}));
setInMemoryMetadata(storage_metadata);
@ -44,6 +47,8 @@ Pipe StorageSystemRemoteDataPaths::read(
MutableColumnPtr col_cache_base_path = ColumnString::create();
MutableColumnPtr col_local_path = ColumnString::create();
MutableColumnPtr col_remote_path = ColumnString::create();
MutableColumnPtr col_size = ColumnUInt64::create();
MutableColumnPtr col_namespace = ColumnString::create();
MutableColumnPtr col_cache_paths = ColumnArray::create(ColumnString::create());
auto disks = context->getDisksMap();
@ -61,7 +66,7 @@ Pipe StorageSystemRemoteDataPaths::read(
if (!cache_base_path.empty())
cache = FileCacheFactory::instance().get(cache_base_path);
for (const auto & [local_path, storage_objects] : remote_paths_by_local_path)
for (const auto & [local_path, common_prefox_for_objects, storage_objects] : remote_paths_by_local_path)
{
for (const auto & object : storage_objects)
{
@ -70,6 +75,8 @@ Pipe StorageSystemRemoteDataPaths::read(
col_cache_base_path->insert(cache_base_path);
col_local_path->insert(local_path);
col_remote_path->insert(object.absolute_path);
col_size->insert(object.bytes_size);
col_namespace->insert(common_prefox_for_objects);
if (cache)
{
@ -91,6 +98,8 @@ Pipe StorageSystemRemoteDataPaths::read(
res_columns.emplace_back(std::move(col_cache_base_path));
res_columns.emplace_back(std::move(col_local_path));
res_columns.emplace_back(std::move(col_remote_path));
res_columns.emplace_back(std::move(col_size));
res_columns.emplace_back(std::move(col_namespace));
res_columns.emplace_back(std::move(col_cache_paths));
UInt64 num_rows = res_columns.at(0)->size();