Merge branch 'master' into sensitive_data_masker_unittest_issue

This commit is contained in:
Suzy Wang 2022-09-07 16:49:03 -04:00 committed by GitHub
commit 61b2e1c32c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
50 changed files with 533 additions and 201 deletions

View File

@ -143,6 +143,8 @@ include (cmake/add_warning.cmake)
if (COMPILER_CLANG)
# generate ranges for fast "addr2line" search
if (NOT CMAKE_BUILD_TYPE_UC STREQUAL "RELEASE")
# NOTE: that clang has a bug because of it does not emit .debug_aranges
# with ThinLTO, so custom ld.lld wrapper is shipped in docker images.
set(COMPILER_FLAGS "${COMPILER_FLAGS} -gdwarf-aranges")
endif ()

View File

@ -15,4 +15,5 @@ ClickHouse® is an open-source column-oriented database management system that a
* [Contacts](https://clickhouse.com/company/contact) can help to get your questions answered if there are any.
## Upcoming events
* [**v22.8 Release Webinar**](https://clickhouse.com/company/events/v22-8-release-webinar) Original creator, co-founder, and CTO of ClickHouse Alexey Milovidov will walk us through the highlights of the release, provide live demos, and share vision into what is coming in the roadmap.
* [**v22.9 Release Webinar**](https://clickhouse.com/company/events/v22-9-release-webinar) Original creator, co-founder, and CTO of ClickHouse Alexey Milovidov will walk us through the highlights of the release, provide live demos, and share vision into what is coming in the roadmap.
* [**ClickHouse for Analytics @ Barracuda Networks**](https://www.meetup.com/clickhouse-silicon-valley-meetup-group/events/288140358/) Join us for this in person meetup hosted by our friends at Barracuda in Bay Area.

17
cmake/ld.lld.in Executable file
View File

@ -0,0 +1,17 @@
#!/usr/bin/env bash
# This is a workaround for bug in llvm/clang,
# that does not produce .debug_aranges with LTO
#
# NOTE: this is a temporary solution, that should be removed once [1] will be
# resolved.
#
# [1]: https://discourse.llvm.org/t/clang-does-not-produce-full-debug-aranges-section-with-thinlto/64898/8
# NOTE: only -flto=thin is supported.
# NOTE: it is not possible to check was there -gdwarf-aranges initially or not.
if [[ "$*" =~ -plugin-opt=thinlto ]]; then
exec "@LLD_PATH@" -mllvm -generate-arange-section "$@"
else
exec "@LLD_PATH@" "$@"
fi

View File

@ -20,7 +20,7 @@ macro(clickhouse_split_debug_symbols)
COMMAND mkdir -p "${STRIP_DESTINATION_DIR}/bin"
COMMAND cp "${STRIP_BINARY_PATH}" "${STRIP_DESTINATION_DIR}/bin/${STRIP_TARGET}"
# Splits debug symbols into separate file, leaves the binary untouched:
COMMAND "${OBJCOPY_PATH}" --only-keep-debug --compress-debug-sections "${STRIP_DESTINATION_DIR}/bin/${STRIP_TARGET}" "${STRIP_DESTINATION_DIR}/lib/debug/bin/${STRIP_TARGET}.debug"
COMMAND "${OBJCOPY_PATH}" --only-keep-debug "${STRIP_DESTINATION_DIR}/bin/${STRIP_TARGET}" "${STRIP_DESTINATION_DIR}/lib/debug/bin/${STRIP_TARGET}.debug"
COMMAND chmod 0644 "${STRIP_DESTINATION_DIR}/lib/debug/bin/${STRIP_TARGET}.debug"
# Strips binary, sections '.note' & '.comment' are removed in line with Debian's stripping policy: www.debian.org/doc/debian-policy/ch-files.html, section '.clickhouse.hash' is needed for integrity check:
COMMAND "${STRIP_PATH}" --remove-section=.comment --remove-section=.note --keep-section=.clickhouse.hash "${STRIP_DESTINATION_DIR}/bin/${STRIP_TARGET}"

View File

@ -94,8 +94,13 @@ if (LINKER_NAME)
if (NOT LLD_PATH)
message (FATAL_ERROR "Using linker ${LINKER_NAME} but can't find its path.")
endif ()
set (CMAKE_EXE_LINKER_FLAGS "${CMAKE_EXE_LINKER_FLAGS} --ld-path=${LLD_PATH}")
set (CMAKE_SHARED_LINKER_FLAGS "${CMAKE_SHARED_LINKER_FLAGS} --ld-path=${LLD_PATH}")
# This a temporary quirk to emit .debug_aranges with ThinLTO
set (LLD_WRAPPER "${CMAKE_CURRENT_BINARY_DIR}/ld.lld")
configure_file ("${CMAKE_CURRENT_SOURCE_DIR}/cmake/ld.lld.in" "${LLD_WRAPPER}" @ONLY)
set (CMAKE_EXE_LINKER_FLAGS "${CMAKE_EXE_LINKER_FLAGS} --ld-path=${LLD_WRAPPER}")
set (CMAKE_SHARED_LINKER_FLAGS "${CMAKE_SHARED_LINKER_FLAGS} --ld-path=${LLD_WRAPPER}")
else ()
set (CMAKE_EXE_LINKER_FLAGS "${CMAKE_EXE_LINKER_FLAGS} -fuse-ld=${LINKER_NAME}")
set (CMAKE_SHARED_LINKER_FLAGS "${CMAKE_SHARED_LINKER_FLAGS} -fuse-ld=${LINKER_NAME}")

View File

@ -640,7 +640,8 @@ Result:
## date\_diff
Returns the difference between two dates or dates with time values.
Returns the difference between two dates or dates with time values.
The difference is calculated using relative units, e.g. the difference between `2022-01-01` and `2021-12-29` is 3 days for day unit (see [toRelativeDayNum](#toRelativeDayNum)), 1 month for month unit (see [toRelativeMonthNum](#toRelativeMonthNum)), 1 year for year unit (see [toRelativeYearNum](#toRelativeYearNum)).
**Syntax**
@ -692,6 +693,25 @@ Result:
└────────────────────────────────────────────────────────────────────────────────────────┘
```
Query:
``` sql
SELECT
toDate('2022-01-01') AS e,
toDate('2021-12-29') AS s,
dateDiff('day', s, e) AS day_diff,
dateDiff('month', s, e) AS month__diff,
dateDiff('year', s, e) AS year_diff;
```
Result:
``` text
┌──────────e─┬──────────s─┬─day_diff─┬─month__diff─┬─year_diff─┐
│ 2022-01-01 │ 2021-12-29 │ 3 │ 1 │ 1 │
└────────────┴────────────┴──────────┴─────────────┴───────────┘
```
## date\_sub
Subtracts the time interval or date interval from the provided date or date with time.

View File

@ -22,13 +22,13 @@ Elf::Elf(const std::string & path)
/// Check if it's an elf.
elf_size = in.buffer().size();
if (elf_size < sizeof(ElfEhdr))
throw Exception("The size of supposedly ELF file is too small", ErrorCodes::CANNOT_PARSE_ELF);
throw Exception(ErrorCodes::CANNOT_PARSE_ELF, "The size of supposedly ELF file '{}' is too small", path);
mapped = in.buffer().begin();
header = reinterpret_cast<const ElfEhdr *>(mapped);
if (memcmp(header->e_ident, "\x7F""ELF", 4) != 0)
throw Exception("The file is not ELF according to magic", ErrorCodes::CANNOT_PARSE_ELF);
throw Exception(ErrorCodes::CANNOT_PARSE_ELF, "The file '{}' is not ELF according to magic", path);
/// Get section header.
ElfOff section_header_offset = header->e_shoff;
@ -37,7 +37,7 @@ Elf::Elf(const std::string & path)
if (!section_header_offset
|| !section_header_num_entries
|| section_header_offset + section_header_num_entries * sizeof(ElfShdr) > elf_size)
throw Exception("The ELF is truncated (section header points after end of file)", ErrorCodes::CANNOT_PARSE_ELF);
throw Exception(ErrorCodes::CANNOT_PARSE_ELF, "The ELF '{}' is truncated (section header points after end of file)", path);
section_headers = reinterpret_cast<const ElfShdr *>(mapped + section_header_offset);
@ -48,11 +48,11 @@ Elf::Elf(const std::string & path)
});
if (!section_names_strtab)
throw Exception("The ELF doesn't have string table with section names", ErrorCodes::CANNOT_PARSE_ELF);
throw Exception(ErrorCodes::CANNOT_PARSE_ELF, "The ELF '{}' doesn't have string table with section names", path);
ElfOff section_names_offset = section_names_strtab->header.sh_offset;
if (section_names_offset >= elf_size)
throw Exception("The ELF is truncated (section names string table points after end of file)", ErrorCodes::CANNOT_PARSE_ELF);
throw Exception(ErrorCodes::CANNOT_PARSE_ELF, "The ELF '{}' is truncated (section names string table points after end of file)", path);
section_names = reinterpret_cast<const char *>(mapped + section_names_offset);
@ -64,7 +64,7 @@ Elf::Elf(const std::string & path)
if (!program_header_offset
|| !program_header_num_entries
|| program_header_offset + program_header_num_entries * sizeof(ElfPhdr) > elf_size)
throw Exception("The ELF is truncated (program header points after end of file)", ErrorCodes::CANNOT_PARSE_ELF);
throw Exception(ErrorCodes::CANNOT_PARSE_ELF, "The ELF '{}' is truncated (program header points after end of file)", path);
program_headers = reinterpret_cast<const ElfPhdr *>(mapped + program_header_offset);
}

View File

@ -145,5 +145,11 @@ String FieldVisitorToString::operator() (const Object & x) const
}
String convertFieldToString(const Field & field)
{
if (field.getType() == Field::Types::Which::String)
return field.get<String>();
return applyVisitor(FieldVisitorToString(), field);
}
}

View File

@ -31,5 +31,8 @@ public:
String operator() (const bool & x) const;
};
}
/// Get value from field and convert it to string.
/// Also remove quotes from strings.
String convertFieldToString(const Field & field);
}

View File

@ -37,7 +37,7 @@ But because ClickHouse is linked with most of the symbols exported (-rdynamic fl
It allows to get source file names and line numbers from addresses. Only available if you use -g option for compiler.
It is also used by default for ClickHouse builds, but because of its weight (about two gigabytes)
it is split to separate binary and provided in clickhouse-common-static-dbg package.
This separate binary is placed in /usr/lib/debug/usr/bin/clickhouse and is loaded automatically by tools like gdb, addr2line.
This separate binary is placed in /usr/lib/debug/usr/bin/clickhouse.debug and is loaded automatically by tools like gdb, addr2line.
When you build ClickHouse by yourself, debug info is not split and present in a single huge binary.
What ClickHouse is using to provide good stack traces?
@ -391,10 +391,22 @@ void collectSymbolsFromELF(
std::filesystem::path local_debug_info_path = canonical_path.parent_path() / canonical_path.stem();
local_debug_info_path += ".debug";
std::filesystem::path debug_info_path = std::filesystem::path("/usr/lib/debug") / canonical_path.relative_path();
debug_info_path += ".debug";
if (std::filesystem::exists(local_debug_info_path))
/// NOTE: This is a workaround for current package system.
///
/// Since nfpm cannot copy file only if it exists,
/// and so in cmake empty .debug file is created instead,
/// but if we will try to load empty Elf file, then the CANNOT_PARSE_ELF
/// exception will be thrown from the Elf::Elf.
auto exists_not_empty = [](const std::filesystem::path & path)
{
return std::filesystem::exists(path) && !std::filesystem::is_empty(path);
};
if (exists_not_empty(local_debug_info_path))
object_name = local_debug_info_path;
else if (std::filesystem::exists(debug_info_path))
else if (exists_not_empty(debug_info_path))
object_name = debug_info_path;
else if (build_id.size() >= 2)
{
@ -412,7 +424,7 @@ void collectSymbolsFromELF(
std::filesystem::path build_id_debug_info_path(
fmt::format("/usr/lib/debug/.build-id/{}/{}.debug", build_id_hex.substr(0, 2), build_id_hex.substr(2)));
if (std::filesystem::exists(build_id_debug_info_path))
if (exists_not_empty(build_id_debug_info_path))
object_name = build_id_debug_info_path;
else
object_name = canonical_path;

View File

@ -44,15 +44,6 @@ struct AttributeConfiguration
using AttributeNameToConfiguration = std::unordered_map<std::string, AttributeConfiguration>;
/// Get value from field and convert it to string.
/// Also remove quotes from strings.
String getFieldAsString(const Field & field)
{
if (field.getType() == Field::Types::Which::String)
return field.get<String>();
return applyVisitor(FieldVisitorToString(), field);
}
String getAttributeExpression(const ASTDictionaryAttributeDeclaration * dict_attr)
{
if (!dict_attr->expression)
@ -61,7 +52,7 @@ String getAttributeExpression(const ASTDictionaryAttributeDeclaration * dict_att
/// EXPRESSION PROPERTY should be expression or string
String expression_str;
if (const auto * literal = dict_attr->expression->as<ASTLiteral>(); literal && literal->value.getType() == Field::Types::String)
expression_str = getFieldAsString(literal->value);
expression_str = convertFieldToString(literal->value);
else
expression_str = queryToString(dict_attr->expression);
@ -275,7 +266,7 @@ void buildSingleAttribute(
AutoPtr<Element> null_value_element(doc->createElement("null_value"));
String null_value_str;
if (dict_attr->default_value)
null_value_str = getFieldAsString(dict_attr->default_value->as<ASTLiteral>()->value);
null_value_str = convertFieldToString(dict_attr->default_value->as<ASTLiteral>()->value);
AutoPtr<Text> null_value(doc->createTextNode(null_value_str));
null_value_element->appendChild(null_value);
attribute_element->appendChild(null_value_element);
@ -452,7 +443,7 @@ void buildConfigurationFromFunctionWithKeyValueArguments(
}
else if (const auto * literal = pair->second->as<const ASTLiteral>())
{
AutoPtr<Text> value(doc->createTextNode(getFieldAsString(literal->value)));
AutoPtr<Text> value(doc->createTextNode(convertFieldToString(literal->value)));
current_xml_element->appendChild(value);
}
else if (const auto * list = pair->second->as<const ASTExpressionList>())
@ -473,7 +464,7 @@ void buildConfigurationFromFunctionWithKeyValueArguments(
Field value;
result->get(0, value);
AutoPtr<Text> text_value(doc->createTextNode(getFieldAsString(value)));
AutoPtr<Text> text_value(doc->createTextNode(convertFieldToString(value)));
current_xml_element->appendChild(text_value);
}
else
@ -519,7 +510,7 @@ void buildSourceConfiguration(
{
AutoPtr<Element> setting_change_element(doc->createElement(name));
settings_element->appendChild(setting_change_element);
AutoPtr<Text> setting_value(doc->createTextNode(getFieldAsString(value)));
AutoPtr<Text> setting_value(doc->createTextNode(convertFieldToString(value)));
setting_change_element->appendChild(setting_value);
}
}

View File

@ -239,7 +239,16 @@ public:
}
/// For one local path there might be multiple remote paths in case of Log family engines.
using LocalPathWithObjectStoragePaths = std::pair<String, StoredObjects>;
struct LocalPathWithObjectStoragePaths
{
std::string local_path;
std::string common_prefix_for_objects;
StoredObjects objects;
LocalPathWithObjectStoragePaths(
const std::string & local_path_, const std::string & common_prefix_for_objects_, StoredObjects && objects_)
: local_path(local_path_), common_prefix_for_objects(common_prefix_for_objects_), objects(std::move(objects_)) {}
};
virtual void getRemotePathsRecursive(const String &, std::vector<LocalPathWithObjectStoragePaths> &)
{

View File

@ -127,7 +127,7 @@ void DiskObjectStorage::getRemotePathsRecursive(const String & local_path, std::
{
try
{
paths_map.emplace_back(local_path, getStorageObjects(local_path));
paths_map.emplace_back(local_path, metadata_storage->getObjectStorageRootPath(), getStorageObjects(local_path));
}
catch (const Exception & e)
{

View File

@ -68,6 +68,14 @@ void DiskObjectStorageMetadata::deserialize(ReadBuffer & buf)
}
}
void DiskObjectStorageMetadata::createFromSingleObject(const std::string & relative_path, size_t bytes_size, size_t ref_count_, bool read_only_)
{
storage_objects.emplace_back(relative_path, bytes_size);
total_size = bytes_size;
ref_count = ref_count_;
read_only = read_only_;
}
void DiskObjectStorageMetadata::deserializeFromString(const std::string & data)
{
ReadBufferFromString buf(data);

View File

@ -50,6 +50,7 @@ public:
void deserialize(ReadBuffer & buf);
void deserializeFromString(const std::string & data);
void createFromSingleObject(const std::string & relative_path, size_t bytes_size, size_t ref_count_, bool is_read_only_);
void serialize(WriteBuffer & buf, bool sync) const;
std::string serializeToString() const;

View File

@ -179,7 +179,6 @@ Pipe ReadFromMergeTree::readFromPool(
sum_marks,
min_marks_for_concurrent_read,
std::move(parts_with_range),
data,
storage_snapshot,
prewhere_info,
required_columns,

View File

@ -0,0 +1,24 @@
#pragma once
#include <string>
#include <unordered_map>
namespace DB
{
/// Alter conversions which should be applied on-fly for part. Build from of
/// the most recent mutation commands for part. Now we have only rename_map
/// here (from ALTER_RENAME) command, because for all other type of alters
/// we can deduce conversions for part from difference between
/// part->getColumns() and storage->getColumns().
struct AlterConversions
{
/// Rename map new_name -> old_name
std::unordered_map<std::string, std::string> rename_map;
bool isColumnRenamed(const std::string & new_name) const { return rename_map.count(new_name) > 0; }
std::string getColumnOldName(const std::string & new_name) const { return rename_map.at(new_name); }
};
}

View File

@ -0,0 +1,68 @@
#pragma once
#include <Interpreters/Context.h>
#include <Storages/MergeTree/AlterConversions.h>
#include <Core/NamesAndTypes.h>
namespace DB
{
class IDataPartStorage;
using DataPartStoragePtr = std::shared_ptr<IDataPartStorage>;
class MergeTreeIndexGranularity;
struct MergeTreeDataPartChecksums;
struct MergeTreeIndexGranularityInfo;
class ISerialization;
using SerializationPtr = std::shared_ptr<const ISerialization>;
/**
* A class which contains all information about a data part that is required
* in order to use MergeTreeDataPartReader's.
* It is a separate interface and not a simple struct because
* otherwise it will need to copy all the information which might not
* be even used (for example, an IndexGranulary class object is quite heavy).
*/
class IMergeTreeDataPartInfoForReader : public WithContext
{
public:
explicit IMergeTreeDataPartInfoForReader(ContextPtr context_) : WithContext(context_) {}
virtual ~IMergeTreeDataPartInfoForReader() = default;
virtual bool isCompactPart() const = 0;
virtual bool isWidePart() const = 0;
virtual bool isInMemoryPart() const = 0;
virtual bool isProjectionPart() const = 0;
virtual const DataPartStoragePtr & getDataPartStorage() const = 0;
virtual const NamesAndTypesList & getColumns() const = 0;
virtual std::optional<size_t> getColumnPosition(const String & column_name) const = 0;
virtual String getColumnNameWithMinimumCompressedSize(bool with_subcolumns) const = 0;
virtual const MergeTreeDataPartChecksums & getChecksums() const = 0;
virtual AlterConversions getAlterConversions() const = 0;
virtual size_t getMarksCount() const = 0;
virtual size_t getFileSizeOrZero(const std::string & file_name) const = 0;
virtual const MergeTreeIndexGranularityInfo & getIndexGranularityInfo() const = 0;
virtual const MergeTreeIndexGranularity & getIndexGranularity() const = 0;
virtual SerializationPtr getSerialization(const NameAndTypePair & column) const = 0;
virtual const SerializationInfoByName & getSerializationInfos() const = 0;
virtual void reportBroken() = 0;
};
using MergeTreeDataPartInfoForReaderPtr = std::shared_ptr<IMergeTreeDataPartInfoForReader>;
}

View File

@ -23,7 +23,7 @@ namespace ErrorCodes
IMergeTreeReader::IMergeTreeReader(
const MergeTreeData::DataPartPtr & data_part_,
MergeTreeDataPartInfoForReaderPtr data_part_info_for_read_,
const NamesAndTypesList & columns_,
const StorageMetadataPtr & metadata_snapshot_,
UncompressedCache * uncompressed_cache_,
@ -31,19 +31,18 @@ IMergeTreeReader::IMergeTreeReader(
const MarkRanges & all_mark_ranges_,
const MergeTreeReaderSettings & settings_,
const ValueSizeMap & avg_value_size_hints_)
: data_part(data_part_)
: data_part_info_for_read(data_part_info_for_read_)
, avg_value_size_hints(avg_value_size_hints_)
, uncompressed_cache(uncompressed_cache_)
, mark_cache(mark_cache_)
, settings(settings_)
, storage(data_part_->storage)
, metadata_snapshot(metadata_snapshot_)
, all_mark_ranges(all_mark_ranges_)
, alter_conversions(storage.getAlterConversionsForPart(data_part))
, alter_conversions(data_part_info_for_read->getAlterConversions())
/// For wide parts convert plain arrays of Nested to subcolumns
/// to allow to use shared offset column from cache.
, requested_columns(isWidePart(data_part) ? Nested::convertToSubcolumns(columns_) : columns_)
, part_columns(isWidePart(data_part) ? Nested::collect(data_part->getColumns()) : data_part->getColumns())
, requested_columns(data_part_info_for_read->isWidePart() ? Nested::convertToSubcolumns(columns_) : columns_)
, part_columns(data_part_info_for_read->isWidePart() ? Nested::collect(data_part_info_for_read->getColumns()) : data_part_info_for_read->getColumns())
{
columns_to_read.reserve(requested_columns.size());
serializations.reserve(requested_columns.size());
@ -71,7 +70,7 @@ void IMergeTreeReader::fillMissingColumns(Columns & res_columns, bool & should_e
catch (Exception & e)
{
/// Better diagnostics.
e.addMessage("(while reading from part " + data_part->data_part_storage->getFullPath() + ")");
e.addMessage("(while reading from part " + data_part_info_for_read->getDataPartStorage()->getFullPath() + ")");
throw;
}
}
@ -99,13 +98,13 @@ void IMergeTreeReader::evaluateMissingDefaults(Block additional_columns, Columns
}
auto dag = DB::evaluateMissingDefaults(
additional_columns, requested_columns, metadata_snapshot->getColumns(), storage.getContext());
additional_columns, requested_columns, metadata_snapshot->getColumns(), data_part_info_for_read->getContext());
if (dag)
{
dag->addMaterializingOutputActions();
auto actions = std::make_shared<
ExpressionActions>(std::move(dag),
ExpressionActionsSettings::fromSettings(storage.getContext()->getSettingsRef()));
ExpressionActionsSettings::fromSettings(data_part_info_for_read->getContext()->getSettingsRef()));
actions->execute(additional_columns);
}
@ -117,7 +116,7 @@ void IMergeTreeReader::evaluateMissingDefaults(Block additional_columns, Columns
catch (Exception & e)
{
/// Better diagnostics.
e.addMessage("(while reading from part " + data_part->data_part_storage->getFullPath() + ")");
e.addMessage("(while reading from part " + data_part_info_for_read->getDataPartStorage()->getFullPath() + ")");
throw;
}
}
@ -151,7 +150,7 @@ SerializationPtr IMergeTreeReader::getSerializationInPart(const NameAndTypePair
if (!column_in_part)
return IDataType::getSerialization(required_column);
const auto & infos = data_part->getSerializationInfos();
const auto & infos = data_part_info_for_read->getSerializationInfos();
if (auto it = infos.find(column_in_part->getNameInStorage()); it != infos.end())
return IDataType::getSerialization(*column_in_part, *it->second);
@ -187,7 +186,7 @@ void IMergeTreeReader::performRequiredConversions(Columns & res_columns) const
copy_block.insert({res_columns[pos], getColumnInPart(*name_and_type).type, name_and_type->name});
}
DB::performRequiredConversions(copy_block, requested_columns, storage.getContext());
DB::performRequiredConversions(copy_block, requested_columns, data_part_info_for_read->getContext());
/// Move columns from block.
name_and_type = requested_columns.begin();
@ -197,7 +196,7 @@ void IMergeTreeReader::performRequiredConversions(Columns & res_columns) const
catch (Exception & e)
{
/// Better diagnostics.
e.addMessage("(while reading from part " + data_part->data_part_storage->getFullPath() + ")");
e.addMessage("(while reading from part " + data_part_info_for_read->getDataPartStorage()->getFullPath() + ")");
throw;
}
}
@ -205,11 +204,11 @@ void IMergeTreeReader::performRequiredConversions(Columns & res_columns) const
IMergeTreeReader::ColumnPosition IMergeTreeReader::findColumnForOffsets(const String & column_name) const
{
String table_name = Nested::extractTableName(column_name);
for (const auto & part_column : data_part->getColumns())
for (const auto & part_column : data_part_info_for_read->getColumns())
{
if (typeid_cast<const DataTypeArray *>(part_column.type.get()))
{
auto position = data_part->getColumnPosition(part_column.getNameInStorage());
auto position = data_part_info_for_read->getColumnPosition(part_column.getNameInStorage());
if (position && Nested::extractTableName(part_column.name) == table_name)
return position;
}

View File

@ -4,6 +4,8 @@
#include <Common/HashTable/HashMap.h>
#include <Storages/MergeTree/MergeTreeReaderStream.h>
#include <Storages/MergeTree/MergeTreeBlockReadUtils.h>
#include <Storages/MergeTree/IMergeTreeDataPart.h>
#include <Storages/MergeTree/IMergeTreeDataPartInfoForReader.h>
namespace DB
{
@ -20,7 +22,7 @@ public:
using DeserializeBinaryBulkStateMap = std::map<std::string, ISerialization::DeserializeBinaryBulkStatePtr>;
IMergeTreeReader(
const MergeTreeData::DataPartPtr & data_part_,
MergeTreeDataPartInfoForReaderPtr data_part_info_for_read_,
const NamesAndTypesList & columns_,
const StorageMetadataPtr & metadata_snapshot_,
UncompressedCache * uncompressed_cache_,
@ -57,7 +59,7 @@ public:
size_t getFirstMarkToRead() const { return all_mark_ranges.front().begin; }
MergeTreeData::DataPartPtr data_part;
MergeTreeDataPartInfoForReaderPtr data_part_info_for_read;
protected:
/// Returns actual column name in part, which can differ from table metadata.
@ -86,7 +88,6 @@ protected:
MergeTreeReaderSettings settings;
const MergeTreeData & storage;
StorageMetadataPtr metadata_snapshot;
MarkRanges all_mark_ranges;
@ -95,7 +96,7 @@ protected:
private:
/// Alter conversions, which must be applied on fly if required
MergeTreeData::AlterConversions alter_conversions;
AlterConversions alter_conversions;
/// Columns that are requested to read.
NamesAndTypesList requested_columns;

View File

@ -0,0 +1,55 @@
#pragma once
#include <Storages/MergeTree/IMergeTreeDataPartInfoForReader.h>
#include <Storages/MergeTree/MergeTreeData.h>
namespace DB
{
class LoadedMergeTreeDataPartInfoForReader final : public IMergeTreeDataPartInfoForReader
{
public:
explicit LoadedMergeTreeDataPartInfoForReader(MergeTreeData::DataPartPtr data_part_)
: IMergeTreeDataPartInfoForReader(data_part_->storage.getContext())
, data_part(data_part_)
{}
bool isCompactPart() const override { return DB::isCompactPart(data_part); }
bool isWidePart() const override { return DB::isWidePart(data_part); }
bool isInMemoryPart() const override { return DB::isInMemoryPart(data_part); }
bool isProjectionPart() const override { return data_part->isProjectionPart(); }
const DataPartStoragePtr & getDataPartStorage() const override { return data_part->data_part_storage; }
const NamesAndTypesList & getColumns() const override { return data_part->getColumns(); }
std::optional<size_t> getColumnPosition(const String & column_name) const override { return data_part->getColumnPosition(column_name); }
AlterConversions getAlterConversions() const override { return data_part->storage.getAlterConversionsForPart(data_part); }
String getColumnNameWithMinimumCompressedSize(bool with_subcolumns) const override { return data_part->getColumnNameWithMinimumCompressedSize(with_subcolumns); }
const MergeTreeDataPartChecksums & getChecksums() const override { return data_part->checksums; }
void reportBroken() override { data_part->storage.reportBrokenPart(data_part); }
size_t getMarksCount() const override { return data_part->getMarksCount(); }
size_t getFileSizeOrZero(const std::string & file_name) const override { return data_part->getFileSizeOrZero(file_name); }
const MergeTreeIndexGranularityInfo & getIndexGranularityInfo() const override { return data_part->index_granularity_info; }
const MergeTreeIndexGranularity & getIndexGranularity() const override { return data_part->index_granularity; }
const SerializationInfoByName & getSerializationInfos() const override { return data_part->getSerializationInfos(); }
SerializationPtr getSerialization(const NameAndTypePair & column) const override { return data_part->getSerialization(column.name); }
private:
MergeTreeData::DataPartPtr data_part;
};
}

View File

@ -36,4 +36,16 @@ size_t getLastMark(const MarkRanges & ranges)
return current_task_last_mark;
}
std::string toString(const MarkRanges & ranges)
{
std::string result;
for (const auto & mark_range : ranges)
{
if (!result.empty())
result += ", ";
result += "(" + std::to_string(mark_range.begin) + ", " + std::to_string(mark_range.end) + ")";
}
return result;
}
}

View File

@ -32,4 +32,6 @@ using MarkRanges = std::deque<MarkRange>;
*/
size_t getLastMark(const MarkRanges & ranges);
std::string toString(const MarkRanges & ranges);
}

View File

@ -43,6 +43,7 @@ MergeTreeBaseSelectProcessor::MergeTreeBaseSelectProcessor(
, storage(storage_)
, storage_snapshot(storage_snapshot_)
, prewhere_info(prewhere_info_)
, prewhere_actions(getPrewhereActions(prewhere_info, actions_settings))
, max_block_size_rows(max_block_size_rows_)
, preferred_block_size_bytes(preferred_block_size_bytes_)
, preferred_max_column_in_block_size_bytes(preferred_max_column_in_block_size_bytes_)
@ -72,7 +73,12 @@ MergeTreeBaseSelectProcessor::MergeTreeBaseSelectProcessor(
header_without_virtual_columns.erase(*it);
}
}
}
std::unique_ptr<PrewhereExprInfo> MergeTreeBaseSelectProcessor::getPrewhereActions(PrewhereInfoPtr prewhere_info, const ExpressionActionsSettings & actions_settings)
{
std::unique_ptr<PrewhereExprInfo> prewhere_actions;
if (prewhere_info)
{
prewhere_actions = std::make_unique<PrewhereExprInfo>();
@ -100,6 +106,8 @@ MergeTreeBaseSelectProcessor::MergeTreeBaseSelectProcessor(
prewhere_actions->steps.emplace_back(std::move(prewhere_step));
}
return prewhere_actions;
}
@ -262,45 +270,62 @@ void MergeTreeBaseSelectProcessor::initializeMergeTreeReadersForPart(
void MergeTreeBaseSelectProcessor::initializeRangeReaders(MergeTreeReadTask & current_task)
{
MergeTreeRangeReader* prev_reader = nullptr;
return initializeRangeReadersImpl(
current_task.range_reader, current_task.pre_range_readers, prewhere_info, prewhere_actions.get(),
reader.get(), current_task.data_part->hasLightweightDelete(), reader_settings,
pre_reader_for_step, lightweight_delete_filter_step, non_const_virtual_column_names);
}
void MergeTreeBaseSelectProcessor::initializeRangeReadersImpl(
MergeTreeRangeReader & range_reader, std::deque<MergeTreeRangeReader> & pre_range_readers,
PrewhereInfoPtr prewhere_info, const PrewhereExprInfo * prewhere_actions,
IMergeTreeReader * reader, bool has_lightweight_delete, const MergeTreeReaderSettings & reader_settings,
const std::vector<std::unique_ptr<IMergeTreeReader>> & pre_reader_for_step,
const PrewhereExprStep & lightweight_delete_filter_step, const Names & non_const_virtual_column_names)
{
MergeTreeRangeReader * prev_reader = nullptr;
bool last_reader = false;
size_t pre_readers_shift = 0;
/// Add filtering step with lightweight delete mask
if (reader_settings.apply_deleted_mask && current_task.data_part->hasLightweightDelete())
if (reader_settings.apply_deleted_mask && has_lightweight_delete)
{
current_task.pre_range_readers.push_back(
MergeTreeRangeReader(pre_reader_for_step[0].get(), prev_reader, &lightweight_delete_filter_step, last_reader, non_const_virtual_column_names));
prev_reader = &current_task.pre_range_readers.back();
MergeTreeRangeReader pre_range_reader(pre_reader_for_step[0].get(), prev_reader, &lightweight_delete_filter_step, last_reader, non_const_virtual_column_names);
pre_range_readers.push_back(std::move(pre_range_reader));
prev_reader = &pre_range_readers.back();
pre_readers_shift++;
}
if (prewhere_info)
{
if (prewhere_actions->steps.size() + pre_readers_shift != pre_reader_for_step.size())
throw Exception(ErrorCodes::LOGICAL_ERROR,
"PREWHERE steps count mismatch, actions: {}, readers: {}",
prewhere_actions->steps.size(), pre_reader_for_step.size());
{
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"PREWHERE steps count mismatch, actions: {}, readers: {}",
prewhere_actions->steps.size(), pre_reader_for_step.size());
}
for (size_t i = 0; i < prewhere_actions->steps.size(); ++i)
{
last_reader = reader->getColumns().empty() && (i + 1 == prewhere_actions->steps.size());
current_task.pre_range_readers.push_back(
MergeTreeRangeReader(pre_reader_for_step[i + pre_readers_shift].get(), prev_reader, &prewhere_actions->steps[i], last_reader, non_const_virtual_column_names));
prev_reader = &current_task.pre_range_readers.back();
MergeTreeRangeReader current_reader(pre_reader_for_step[i + pre_readers_shift].get(), prev_reader, &prewhere_actions->steps[i], last_reader, non_const_virtual_column_names);
pre_range_readers.push_back(std::move(current_reader));
prev_reader = &pre_range_readers.back();
}
}
if (!last_reader)
{
current_task.range_reader = MergeTreeRangeReader(reader.get(), prev_reader, nullptr, true, non_const_virtual_column_names);
range_reader = MergeTreeRangeReader(reader, prev_reader, nullptr, true, non_const_virtual_column_names);
}
else
{
/// If all columns are read by pre_range_readers than move last pre_range_reader into range_reader
current_task.range_reader = std::move(current_task.pre_range_readers.back());
current_task.pre_range_readers.pop_back();
range_reader = std::move(pre_range_readers.back());
pre_range_readers.pop_back();
}
}

View File

@ -89,6 +89,20 @@ protected:
static void
injectVirtualColumns(Block & block, size_t row_count, MergeTreeReadTask * task, const DataTypePtr & partition_value_type, const Names & virtual_columns);
static std::unique_ptr<PrewhereExprInfo> getPrewhereActions(PrewhereInfoPtr prewhere_info, const ExpressionActionsSettings & actions_settings);
static void initializeRangeReadersImpl(
MergeTreeRangeReader & range_reader,
std::deque<MergeTreeRangeReader> & pre_range_readers,
PrewhereInfoPtr prewhere_info,
const PrewhereExprInfo * prewhere_actions,
IMergeTreeReader * reader,
bool has_lightweight_delete,
const MergeTreeReaderSettings & reader_settings,
const std::vector<std::unique_ptr<IMergeTreeReader>> & pre_reader_for_step,
const PrewhereExprStep & lightweight_delete_filter_step,
const Names & non_const_virtual_column_names);
/// Sets up data readers for each step of prewhere and where
void initializeMergeTreeReadersForPart(
MergeTreeData::DataPartPtr & data_part,

View File

@ -1,5 +1,6 @@
#include <Storages/MergeTree/MergeTreeBlockReadUtils.h>
#include <Storages/MergeTree/MergeTreeData.h>
#include <Storages/MergeTree/IMergeTreeDataPartInfoForReader.h>
#include <DataTypes/NestedUtils.h>
#include <Core/NamesAndTypes.h>
#include <Common/checkStackSize.h>
@ -28,8 +29,8 @@ namespace
bool injectRequiredColumnsRecursively(
const String & column_name,
const StorageSnapshotPtr & storage_snapshot,
const MergeTreeData::AlterConversions & alter_conversions,
const MergeTreeData::DataPartPtr & part,
const AlterConversions & alter_conversions,
const IMergeTreeDataPartInfoForReader & data_part_info_for_reader,
const GetColumnsOptions & options,
Names & columns,
NameSet & required_columns,
@ -47,7 +48,7 @@ bool injectRequiredColumnsRecursively(
if (alter_conversions.isColumnRenamed(column_name_in_part))
column_name_in_part = alter_conversions.getColumnOldName(column_name_in_part);
auto column_in_part = part->getColumns().tryGetByName(column_name_in_part);
auto column_in_part = data_part_info_for_reader.getColumns().tryGetByName(column_name_in_part);
if (column_in_part
&& (!column_in_storage->isSubcolumn()
@ -78,7 +79,7 @@ bool injectRequiredColumnsRecursively(
bool result = false;
for (const auto & identifier : identifiers)
result |= injectRequiredColumnsRecursively(
identifier, storage_snapshot, alter_conversions, part,
identifier, storage_snapshot, alter_conversions, data_part_info_for_reader,
options, columns, required_columns, injected_columns);
return result;
@ -87,9 +88,8 @@ bool injectRequiredColumnsRecursively(
}
NameSet injectRequiredColumns(
const MergeTreeData & storage,
const IMergeTreeDataPartInfoForReader & data_part_info_for_reader,
const StorageSnapshotPtr & storage_snapshot,
const MergeTreeData::DataPartPtr & part,
bool with_subcolumns,
Names & columns)
{
@ -97,9 +97,9 @@ NameSet injectRequiredColumns(
NameSet injected_columns;
bool have_at_least_one_physical_column = false;
MergeTreeData::AlterConversions alter_conversions;
if (!part->isProjectionPart())
alter_conversions = storage.getAlterConversionsForPart(part);
AlterConversions alter_conversions;
if (!data_part_info_for_reader.isProjectionPart())
alter_conversions = data_part_info_for_reader.getAlterConversions();
auto options = GetColumnsOptions(GetColumnsOptions::AllPhysical)
.withExtendedObjects()
@ -115,7 +115,7 @@ NameSet injectRequiredColumns(
have_at_least_one_physical_column |= injectRequiredColumnsRecursively(
columns[i], storage_snapshot, alter_conversions,
part, options, columns, required_columns, injected_columns);
data_part_info_for_reader, options, columns, required_columns, injected_columns);
}
/** Add a column of the minimum size.
@ -124,7 +124,7 @@ NameSet injectRequiredColumns(
*/
if (!have_at_least_one_physical_column)
{
const auto minimum_size_column_name = part->getColumnNameWithMinimumCompressedSize(with_subcolumns);
const auto minimum_size_column_name = data_part_info_for_reader.getColumnNameWithMinimumCompressedSize(with_subcolumns);
columns.push_back(minimum_size_column_name);
/// correctly report added column
injected_columns.insert(columns.back());
@ -135,13 +135,22 @@ NameSet injectRequiredColumns(
MergeTreeReadTask::MergeTreeReadTask(
const MergeTreeData::DataPartPtr & data_part_, const MarkRanges & mark_ranges_, size_t part_index_in_query_,
const Names & ordered_names_, const NameSet & column_name_set_, const MergeTreeReadTaskColumns & task_columns_,
const MergeTreeData::DataPartPtr & data_part_,
const MarkRanges & mark_ranges_,
size_t part_index_in_query_,
const Names & ordered_names_,
const NameSet & column_name_set_,
const MergeTreeReadTaskColumns & task_columns_,
bool remove_prewhere_column_,
MergeTreeBlockSizePredictorPtr && size_predictor_)
: data_part{data_part_}, mark_ranges{mark_ranges_}, part_index_in_query{part_index_in_query_},
ordered_names{ordered_names_}, column_name_set{column_name_set_}, task_columns{task_columns_},
remove_prewhere_column{remove_prewhere_column_}, size_predictor{std::move(size_predictor_)}
: data_part{data_part_}
, mark_ranges{mark_ranges_}
, part_index_in_query{part_index_in_query_}
, ordered_names{ordered_names_}
, column_name_set{column_name_set_}
, task_columns{task_columns_}
, remove_prewhere_column{remove_prewhere_column_}
, size_predictor{std::move(size_predictor_)}
{
}
@ -270,9 +279,8 @@ void MergeTreeBlockSizePredictor::update(const Block & sample_block, const Colum
MergeTreeReadTaskColumns getReadTaskColumns(
const MergeTreeData & storage,
const IMergeTreeDataPartInfoForReader & data_part_info_for_reader,
const StorageSnapshotPtr & storage_snapshot,
const MergeTreeData::DataPartPtr & data_part,
const Names & required_columns,
const Names & system_columns,
const PrewhereInfoPtr & prewhere_info,
@ -284,13 +292,13 @@ MergeTreeReadTaskColumns getReadTaskColumns(
/// Read system columns such as lightweight delete mask "_row_exists" if it is persisted in the part
for (const auto & name : system_columns)
{
if (data_part->getColumns().contains(name))
if (data_part_info_for_reader.getColumns().contains(name))
column_names.push_back(name);
}
/// inject columns required for defaults evaluation
injectRequiredColumns(
storage, storage_snapshot, data_part, with_subcolumns, column_names);
data_part_info_for_reader, storage_snapshot, with_subcolumns, column_names);
MergeTreeReadTaskColumns result;
auto options = GetColumnsOptions(GetColumnsOptions::All)
@ -316,7 +324,7 @@ MergeTreeReadTaskColumns getReadTaskColumns(
Names all_pre_column_names = prewhere_info->prewhere_actions->getRequiredColumnsNames();
const auto injected_pre_columns = injectRequiredColumns(
storage, storage_snapshot, data_part, with_subcolumns, all_pre_column_names);
data_part_info_for_reader, storage_snapshot, with_subcolumns, all_pre_column_names);
for (const auto & name : all_pre_column_names)
{

View File

@ -12,6 +12,7 @@ namespace DB
class MergeTreeData;
struct MergeTreeReadTask;
struct MergeTreeBlockSizePredictor;
class IMergeTreeDataPartInfoForReader;
using MergeTreeReadTaskPtr = std::unique_ptr<MergeTreeReadTask>;
using MergeTreeBlockSizePredictorPtr = std::shared_ptr<MergeTreeBlockSizePredictor>;
@ -23,9 +24,8 @@ using MergeTreeBlockSizePredictorPtr = std::shared_ptr<MergeTreeBlockSizePredict
* Adds them to the `columns`.
*/
NameSet injectRequiredColumns(
const MergeTreeData & storage,
const IMergeTreeDataPartInfoForReader & data_part_info_for_reader,
const StorageSnapshotPtr & storage_snapshot,
const MergeTreeData::DataPartPtr & part,
bool with_subcolumns,
Names & columns);
@ -68,16 +68,19 @@ struct MergeTreeReadTask
bool isFinished() const { return mark_ranges.empty() && range_reader.isCurrentRangeFinished(); }
MergeTreeReadTask(
const MergeTreeData::DataPartPtr & data_part_, const MarkRanges & mark_ranges_, size_t part_index_in_query_,
const Names & ordered_names_, const NameSet & column_name_set_, const MergeTreeReadTaskColumns & task_columns_,
const MergeTreeData::DataPartPtr & data_part_,
const MarkRanges & mark_ranges_,
size_t part_index_in_query_,
const Names & ordered_names_,
const NameSet & column_name_set_,
const MergeTreeReadTaskColumns & task_columns_,
bool remove_prewhere_column_,
MergeTreeBlockSizePredictorPtr && size_predictor_);
};
MergeTreeReadTaskColumns getReadTaskColumns(
const MergeTreeData & storage,
const IMergeTreeDataPartInfoForReader & data_part_info_for_reader,
const StorageSnapshotPtr & storage_snapshot,
const MergeTreeData::DataPartPtr & data_part,
const Names & required_columns,
const Names & system_columns,
const PrewhereInfoPtr & prewhere_info,

View File

@ -6673,7 +6673,7 @@ bool MergeTreeData::canUsePolymorphicParts(const MergeTreeSettings & settings, S
return true;
}
MergeTreeData::AlterConversions MergeTreeData::getAlterConversionsForPart(const MergeTreeDataPartPtr part) const
AlterConversions MergeTreeData::getAlterConversionsForPart(const MergeTreeDataPartPtr part) const
{
MutationCommands commands = getFirstAlterMutationCommandsForPart(part);

View File

@ -24,6 +24,7 @@
#include <Storages/MergeTree/ZeroCopyLock.h>
#include <Storages/MergeTree/TemporaryParts.h>
#include <Storages/IndicesDescription.h>
#include <Storages/MergeTree/AlterConversions.h>
#include <Storages/DataDestinationType.h>
#include <Storages/extractKeyExpressionList.h>
#include <Storages/PartitionCommands.h>
@ -167,20 +168,6 @@ public:
STRONG_TYPEDEF(String, PartitionID)
/// Alter conversions which should be applied on-fly for part. Build from of
/// the most recent mutation commands for part. Now we have only rename_map
/// here (from ALTER_RENAME) command, because for all other type of alters
/// we can deduce conversions for part from difference between
/// part->getColumns() and storage->getColumns().
struct AlterConversions
{
/// Rename map new_name -> old_name
std::unordered_map<String, String> rename_map;
bool isColumnRenamed(const String & new_name) const { return rename_map.contains(new_name); }
String getColumnOldName(const String & new_name) const { return rename_map.at(new_name); }
};
struct LessDataPart
{
using is_transparent = void;

View File

@ -2,6 +2,7 @@
#include <DataTypes/NestedUtils.h>
#include <Storages/MergeTree/MergeTreeReaderCompact.h>
#include <Storages/MergeTree/MergeTreeDataPartWriterCompact.h>
#include <Storages/MergeTree/LoadedMergeTreeDataPartInfoForReader.h>
namespace DB
@ -45,9 +46,9 @@ IMergeTreeDataPart::MergeTreeReaderPtr MergeTreeDataPartCompact::getReader(
const ValueSizeMap & avg_value_size_hints,
const ReadBufferFromFileBase::ProfileCallback & profile_callback) const
{
auto ptr = std::static_pointer_cast<const MergeTreeDataPartCompact>(shared_from_this());
auto read_info = std::make_shared<LoadedMergeTreeDataPartInfoForReader>(shared_from_this());
return std::make_unique<MergeTreeReaderCompact>(
ptr, columns_to_read, metadata_snapshot, uncompressed_cache,
read_info, columns_to_read, metadata_snapshot, uncompressed_cache,
mark_cache, mark_ranges, reader_settings,
avg_value_size_hints, profile_callback);
}
@ -90,39 +91,44 @@ void MergeTreeDataPartCompact::calculateEachColumnSizes(ColumnSizeByName & /*eac
total_size.marks += mrk_checksum->second.file_size;
}
void MergeTreeDataPartCompact::loadIndexGranularity()
void MergeTreeDataPartCompact::loadIndexGranularityImpl(
MergeTreeIndexGranularity & index_granularity_, const MergeTreeIndexGranularityInfo & index_granularity_info_,
const NamesAndTypesList & columns_, const DataPartStoragePtr & data_part_storage_)
{
//String full_path = getRelativePath();
if (columns.empty())
throw Exception("No columns in part " + name, ErrorCodes::NO_FILE_IN_DATA_PART);
if (!index_granularity_info.is_adaptive)
if (!index_granularity_info_.is_adaptive)
throw Exception("MergeTreeDataPartCompact cannot be created with non-adaptive granulary.", ErrorCodes::NOT_IMPLEMENTED);
auto marks_file_path = index_granularity_info.getMarksFilePath("data");
if (!data_part_storage->exists(marks_file_path))
auto marks_file_path = index_granularity_info_.getMarksFilePath("data");
if (!data_part_storage_->exists(marks_file_path))
throw Exception(
ErrorCodes::NO_FILE_IN_DATA_PART,
"Marks file '{}' doesn't exist",
std::string(fs::path(data_part_storage->getFullPath()) / marks_file_path));
std::string(fs::path(data_part_storage_->getFullPath()) / marks_file_path));
size_t marks_file_size = data_part_storage->getFileSize(marks_file_path);
size_t marks_file_size = data_part_storage_->getFileSize(marks_file_path);
auto buffer = data_part_storage->readFile(marks_file_path, ReadSettings().adjustBufferSize(marks_file_size), marks_file_size, std::nullopt);
auto buffer = data_part_storage_->readFile(marks_file_path, ReadSettings().adjustBufferSize(marks_file_size), marks_file_size, std::nullopt);
while (!buffer->eof())
{
/// Skip offsets for columns
buffer->seek(columns.size() * sizeof(MarkInCompressedFile), SEEK_CUR);
buffer->seek(columns_.size() * sizeof(MarkInCompressedFile), SEEK_CUR);
size_t granularity;
readIntBinary(granularity, *buffer);
index_granularity.appendMark(granularity);
index_granularity_.appendMark(granularity);
}
if (index_granularity.getMarksCount() * index_granularity_info.getMarkSizeInBytes(columns.size()) != marks_file_size)
if (index_granularity_.getMarksCount() * index_granularity_info_.getMarkSizeInBytes(columns_.size()) != marks_file_size)
throw Exception("Cannot read all marks from file " + marks_file_path, ErrorCodes::CANNOT_READ_ALL_DATA);
index_granularity.setInitialized();
index_granularity_.setInitialized();
}
void MergeTreeDataPartCompact::loadIndexGranularity()
{
if (columns.empty())
throw Exception("No columns in part " + name, ErrorCodes::NO_FILE_IN_DATA_PART);
loadIndexGranularityImpl(index_granularity, index_granularity_info, columns, data_part_storage);
}
bool MergeTreeDataPartCompact::hasColumnFiles(const NameAndTypePair & column) const

View File

@ -65,6 +65,11 @@ public:
~MergeTreeDataPartCompact() override;
protected:
static void loadIndexGranularityImpl(
MergeTreeIndexGranularity & index_granularity_, const MergeTreeIndexGranularityInfo & index_granularity_info_,
const NamesAndTypesList & columns_, const DataPartStoragePtr & data_part_storage_);
private:
void checkConsistency(bool require_part_metadata) const override;

View File

@ -3,6 +3,7 @@
#include <Storages/MergeTree/MergedBlockOutputStream.h>
#include <Storages/MergeTree/MergeTreeDataPartWriterInMemory.h>
#include <Storages/MergeTree/IMergeTreeReader.h>
#include <Storages/MergeTree/LoadedMergeTreeDataPartInfoForReader.h>
#include <DataTypes/NestedUtils.h>
#include <Interpreters/Context.h>
#include <Poco/Logger.h>
@ -48,9 +49,10 @@ IMergeTreeDataPart::MergeTreeReaderPtr MergeTreeDataPartInMemory::getReader(
const ValueSizeMap & /* avg_value_size_hints */,
const ReadBufferFromFileBase::ProfileCallback & /* profile_callback */) const
{
auto read_info = std::make_shared<LoadedMergeTreeDataPartInfoForReader>(shared_from_this());
auto ptr = std::static_pointer_cast<const MergeTreeDataPartInMemory>(shared_from_this());
return std::make_unique<MergeTreeReaderInMemory>(
ptr, columns_to_read, metadata_snapshot, mark_ranges, reader_settings);
read_info, ptr, columns_to_read, metadata_snapshot, mark_ranges, reader_settings);
}
IMergeTreeDataPart::MergeTreeWriterPtr MergeTreeDataPartInMemory::getWriter(

View File

@ -2,6 +2,7 @@
#include <Storages/MergeTree/MergeTreeReaderWide.h>
#include <Storages/MergeTree/MergeTreeDataPartWriterWide.h>
#include <Storages/MergeTree/IMergeTreeDataPartWriter.h>
#include <Storages/MergeTree/LoadedMergeTreeDataPartInfoForReader.h>
#include <DataTypes/NestedUtils.h>
#include <Core/NamesAndTypes.h>
@ -47,9 +48,9 @@ IMergeTreeDataPart::MergeTreeReaderPtr MergeTreeDataPartWide::getReader(
const ValueSizeMap & avg_value_size_hints,
const ReadBufferFromFileBase::ProfileCallback & profile_callback) const
{
auto ptr = std::static_pointer_cast<const MergeTreeDataPartWide>(shared_from_this());
auto read_info = std::make_shared<LoadedMergeTreeDataPartInfoForReader>(shared_from_this());
return std::make_unique<MergeTreeReaderWide>(
ptr, columns_to_read,
read_info, columns_to_read,
metadata_snapshot, uncompressed_cache,
mark_cache, mark_ranges, reader_settings,
avg_value_size_hints, profile_callback);
@ -103,46 +104,52 @@ ColumnSize MergeTreeDataPartWide::getColumnSizeImpl(
return size;
}
void MergeTreeDataPartWide::loadIndexGranularity()
void MergeTreeDataPartWide::loadIndexGranularityImpl(
MergeTreeIndexGranularity & index_granularity_, MergeTreeIndexGranularityInfo & index_granularity_info_,
const DataPartStoragePtr & data_part_storage_, const std::string & any_column_file_name)
{
index_granularity_info.changeGranularityIfRequired(data_part_storage);
if (columns.empty())
throw Exception("No columns in part " + name, ErrorCodes::NO_FILE_IN_DATA_PART);
index_granularity_info_.changeGranularityIfRequired(data_part_storage_);
/// We can use any column, it doesn't matter
std::string marks_file_path = index_granularity_info.getMarksFilePath(getFileNameForColumn(columns.front()));
if (!data_part_storage->exists(marks_file_path))
std::string marks_file_path = index_granularity_info_.getMarksFilePath(any_column_file_name);
if (!data_part_storage_->exists(marks_file_path))
throw Exception(
ErrorCodes::NO_FILE_IN_DATA_PART, "Marks file '{}' doesn't exist",
std::string(fs::path(data_part_storage->getFullPath()) / marks_file_path));
std::string(fs::path(data_part_storage_->getFullPath()) / marks_file_path));
size_t marks_file_size = data_part_storage->getFileSize(marks_file_path);
size_t marks_file_size = data_part_storage_->getFileSize(marks_file_path);
if (!index_granularity_info.is_adaptive)
if (!index_granularity_info_.is_adaptive)
{
size_t marks_count = marks_file_size / index_granularity_info.getMarkSizeInBytes();
index_granularity.resizeWithFixedGranularity(marks_count, index_granularity_info.fixed_index_granularity); /// all the same
size_t marks_count = marks_file_size / index_granularity_info_.getMarkSizeInBytes();
index_granularity_.resizeWithFixedGranularity(marks_count, index_granularity_info_.fixed_index_granularity); /// all the same
}
else
{
auto buffer = data_part_storage->readFile(marks_file_path, ReadSettings().adjustBufferSize(marks_file_size), marks_file_size, std::nullopt);
auto buffer = data_part_storage_->readFile(marks_file_path, ReadSettings().adjustBufferSize(marks_file_size), marks_file_size, std::nullopt);
while (!buffer->eof())
{
buffer->seek(sizeof(size_t) * 2, SEEK_CUR); /// skip offset_in_compressed file and offset_in_decompressed_block
size_t granularity;
readIntBinary(granularity, *buffer);
index_granularity.appendMark(granularity);
index_granularity_.appendMark(granularity);
}
if (index_granularity.getMarksCount() * index_granularity_info.getMarkSizeInBytes() != marks_file_size)
if (index_granularity_.getMarksCount() * index_granularity_info_.getMarkSizeInBytes() != marks_file_size)
throw Exception(
ErrorCodes::CANNOT_READ_ALL_DATA, "Cannot read all marks from file {}",
std::string(fs::path(data_part_storage->getFullPath()) / marks_file_path));
std::string(fs::path(data_part_storage_->getFullPath()) / marks_file_path));
}
index_granularity.setInitialized();
index_granularity_.setInitialized();
}
void MergeTreeDataPartWide::loadIndexGranularity()
{
if (columns.empty())
throw Exception("No columns in part " + name, ErrorCodes::NO_FILE_IN_DATA_PART);
loadIndexGranularityImpl(index_granularity, index_granularity_info, data_part_storage, getFileNameForColumn(columns.front()));
}
bool MergeTreeDataPartWide::isStoredOnRemoteDisk() const

View File

@ -61,6 +61,11 @@ public:
bool hasColumnFiles(const NameAndTypePair & column) const override;
protected:
static void loadIndexGranularityImpl(
MergeTreeIndexGranularity & index_granularity_, MergeTreeIndexGranularityInfo & index_granularity_info_,
const DataPartStoragePtr & data_part_storage_, const std::string & any_column_file_name);
private:
void checkConsistency(bool require_part_metadata) const override;

View File

@ -29,6 +29,8 @@ public:
MergeTreeIndexGranularityInfo(const MergeTreeData & storage, MergeTreeDataPartType type_);
MergeTreeIndexGranularityInfo(MergeTreeDataPartType type_, bool is_adaptive_, size_t index_granularity_, size_t index_granularity_bytes_);
void changeGranularityIfRequired(const DataPartStoragePtr & data_part_storage);
String getMarksFilePath(const String & path_prefix) const

View File

@ -83,7 +83,7 @@ MergeTreeRangeReader::DelayedStream::DelayedStream(
: current_mark(from_mark), current_offset(0), num_delayed_rows(0)
, current_task_last_mark(current_task_last_mark_)
, merge_tree_reader(merge_tree_reader_)
, index_granularity(&(merge_tree_reader->data_part->index_granularity))
, index_granularity(&(merge_tree_reader->data_part_info_for_read->getIndexGranularity()))
, continue_reading(false), is_finished(false)
{
}
@ -181,7 +181,7 @@ MergeTreeRangeReader::Stream::Stream(
: current_mark(from_mark), offset_after_current_mark(0)
, last_mark(to_mark)
, merge_tree_reader(merge_tree_reader_)
, index_granularity(&(merge_tree_reader->data_part->index_granularity))
, index_granularity(&(merge_tree_reader->data_part_info_for_read->getIndexGranularity()))
, current_mark_index_granularity(index_granularity->getMarkRows(from_mark))
, stream(from_mark, current_task_last_mark, merge_tree_reader)
{
@ -652,7 +652,7 @@ MergeTreeRangeReader::MergeTreeRangeReader(
bool last_reader_in_chain_,
const Names & non_const_virtual_column_names_)
: merge_tree_reader(merge_tree_reader_)
, index_granularity(&(merge_tree_reader->data_part->index_granularity))
, index_granularity(&(merge_tree_reader->data_part_info_for_read->getIndexGranularity()))
, prev_reader(prev_reader_)
, prewhere_info(prewhere_info_)
, last_reader_in_chain(last_reader_in_chain_)
@ -946,7 +946,8 @@ MergeTreeRangeReader::ReadResult MergeTreeRangeReader::startReadingChain(size_t
result.addRows(stream.finalize(result.columns));
/// Last granule may be incomplete.
result.adjustLastGranule();
if (!result.rowsPerGranule().empty())
result.adjustLastGranule();
for (const auto & column_name : non_const_virtual_column_names)
{

View File

@ -1,5 +1,6 @@
#include <Storages/MergeTree/MergeTreeReadPool.h>
#include <Storages/MergeTree/MergeTreeBaseSelectProcessor.h>
#include <Storages/MergeTree/LoadedMergeTreeDataPartInfoForReader.h>
#include <Common/formatReadable.h>
#include <base/range.h>
@ -22,7 +23,6 @@ MergeTreeReadPool::MergeTreeReadPool(
size_t sum_marks_,
size_t min_marks_for_concurrent_read_,
RangesInDataParts && parts_,
const MergeTreeData & data_,
const StorageSnapshotPtr & storage_snapshot_,
const PrewhereInfoPtr & prewhere_info_,
const Names & column_names_,
@ -32,7 +32,6 @@ MergeTreeReadPool::MergeTreeReadPool(
bool do_not_steal_tasks_)
: backoff_settings{backoff_settings_}
, backoff_state{threads_}
, data{data_}
, storage_snapshot{storage_snapshot_}
, column_names{column_names_}
, virtual_column_names{virtual_column_names_}
@ -214,7 +213,7 @@ std::vector<size_t> MergeTreeReadPool::fillPerPartInfo(const RangesInDataParts &
per_part_sum_marks.push_back(sum_marks);
auto task_columns = getReadTaskColumns(
data, storage_snapshot, part.data_part,
LoadedMergeTreeDataPartInfoForReader(part.data_part), storage_snapshot,
column_names, virtual_column_names, prewhere_info, /*with_subcolumns=*/ true);
auto size_predictor = !predict_block_size_bytes ? nullptr

View File

@ -70,11 +70,16 @@ private:
public:
MergeTreeReadPool(
size_t threads_, size_t sum_marks_, size_t min_marks_for_concurrent_read_,
RangesInDataParts && parts_, const MergeTreeData & data_, const StorageSnapshotPtr & storage_snapshot_,
size_t threads_,
size_t sum_marks_,
size_t min_marks_for_concurrent_read_,
RangesInDataParts && parts_,
const StorageSnapshotPtr & storage_snapshot_,
const PrewhereInfoPtr & prewhere_info_,
const Names & column_names_, const Names & virtual_column_names_,
const BackoffSettings & backoff_settings_, size_t preferred_block_size_bytes_,
const Names & column_names_,
const Names & virtual_column_names_,
const BackoffSettings & backoff_settings_,
size_t preferred_block_size_bytes_,
bool do_not_steal_tasks_ = false);
MergeTreeReadTaskPtr getTask(size_t min_marks_to_read, size_t thread, const Names & ordered_names);
@ -94,7 +99,6 @@ private:
size_t threads, size_t sum_marks, std::vector<size_t> per_part_sum_marks,
const RangesInDataParts & parts, size_t min_marks_for_concurrent_read);
const MergeTreeData & data;
StorageSnapshotPtr storage_snapshot;
const Names column_names;
const Names virtual_column_names;

View File

@ -15,7 +15,7 @@ namespace ErrorCodes
MergeTreeReaderCompact::MergeTreeReaderCompact(
DataPartCompactPtr data_part_,
MergeTreeDataPartInfoForReaderPtr data_part_info_for_read_,
NamesAndTypesList columns_,
const StorageMetadataPtr & metadata_snapshot_,
UncompressedCache * uncompressed_cache_,
@ -26,7 +26,7 @@ MergeTreeReaderCompact::MergeTreeReaderCompact(
const ReadBufferFromFileBase::ProfileCallback & profile_callback_,
clockid_t clock_type_)
: IMergeTreeReader(
data_part_,
data_part_info_for_read_,
columns_,
metadata_snapshot_,
uncompressed_cache_,
@ -35,14 +35,14 @@ MergeTreeReaderCompact::MergeTreeReaderCompact(
settings_,
avg_value_size_hints_)
, marks_loader(
data_part->data_part_storage,
data_part_info_for_read_->getDataPartStorage(),
mark_cache,
data_part->index_granularity_info.getMarksFilePath(MergeTreeDataPartCompact::DATA_FILE_NAME),
data_part->getMarksCount(),
data_part->index_granularity_info,
data_part_info_for_read_->getIndexGranularityInfo().getMarksFilePath(MergeTreeDataPartCompact::DATA_FILE_NAME),
data_part_info_for_read_->getMarksCount(),
data_part_info_for_read_->getIndexGranularityInfo(),
settings.save_marks_in_cache,
settings.read_settings,
data_part->getColumns().size())
data_part_info_for_read_->getColumns().size())
{
try
{
@ -64,7 +64,7 @@ MergeTreeReaderCompact::MergeTreeReaderCompact(
continue;
}
auto position = data_part->getColumnPosition(column_to_read.getNameInStorage());
auto position = data_part_info_for_read->getColumnPosition(column_to_read.getNameInStorage());
if (!position && typeid_cast<const DataTypeArray *>(column_to_read.type.get()))
{
/// If array of Nested column is missing in part,
@ -77,7 +77,7 @@ MergeTreeReaderCompact::MergeTreeReaderCompact(
}
/// Do not use max_read_buffer_size, but try to lower buffer size with maximal size of granule to avoid reading much data.
auto buffer_size = getReadBufferSize(data_part, marks_loader, column_positions, all_mark_ranges);
auto buffer_size = getReadBufferSize(*data_part_info_for_read, marks_loader, column_positions, all_mark_ranges);
if (buffer_size)
settings.read_settings = settings.read_settings.adjustBufferSize(buffer_size);
@ -88,10 +88,10 @@ MergeTreeReaderCompact::MergeTreeReaderCompact(
if (uncompressed_cache)
{
auto buffer = std::make_unique<CachedCompressedReadBuffer>(
std::string(fs::path(data_part->data_part_storage->getFullPath()) / path),
std::string(fs::path(data_part_info_for_read->getDataPartStorage()->getFullPath()) / path),
[this, path]()
{
return data_part->data_part_storage->readFile(
return data_part_info_for_read->getDataPartStorage()->readFile(
path,
settings.read_settings,
std::nullopt, std::nullopt);
@ -113,7 +113,7 @@ MergeTreeReaderCompact::MergeTreeReaderCompact(
{
auto buffer =
std::make_unique<CompressedReadBufferFromFile>(
data_part->data_part_storage->readFile(
data_part_info_for_read->getDataPartStorage()->readFile(
path,
settings.read_settings,
std::nullopt, std::nullopt),
@ -132,7 +132,7 @@ MergeTreeReaderCompact::MergeTreeReaderCompact(
}
catch (...)
{
storage.reportBrokenPart(data_part);
data_part_info_for_read->reportBroken();
throw;
}
}
@ -156,7 +156,7 @@ size_t MergeTreeReaderCompact::readRows(
while (read_rows < max_rows_to_read)
{
size_t rows_to_read = data_part->index_granularity.getMarkRows(from_mark);
size_t rows_to_read = data_part_info_for_read->getIndexGranularity().getMarkRows(from_mark);
for (size_t pos = 0; pos < num_columns; ++pos)
{
@ -179,7 +179,7 @@ size_t MergeTreeReaderCompact::readRows(
catch (Exception & e)
{
if (e.code() != ErrorCodes::MEMORY_LIMIT_EXCEEDED)
storage.reportBrokenPart(data_part);
data_part_info_for_read->reportBroken();
/// Better diagnostics.
e.addMessage("(while reading column " + columns_to_read[pos].name + ")");
@ -187,7 +187,7 @@ size_t MergeTreeReaderCompact::readRows(
}
catch (...)
{
storage.reportBrokenPart(data_part);
data_part_info_for_read->reportBroken();
throw;
}
}
@ -279,7 +279,7 @@ void MergeTreeReaderCompact::seekToMark(size_t row_index, size_t column_index)
void MergeTreeReaderCompact::adjustUpperBound(size_t last_mark)
{
size_t right_offset = 0;
if (last_mark < data_part->getMarksCount()) /// Otherwise read until the end of file
if (last_mark < data_part_info_for_read->getMarksCount()) /// Otherwise read until the end of file
right_offset = marks_loader.getMark(last_mark).offset_in_compressed_file;
if (right_offset == 0)
@ -307,7 +307,7 @@ bool MergeTreeReaderCompact::isContinuousReading(size_t mark, size_t column_posi
return false;
const auto & [last_mark, last_column] = *last_read_granule;
return (mark == last_mark && column_position == last_column + 1)
|| (mark == last_mark + 1 && column_position == 0 && last_column == data_part->getColumns().size() - 1);
|| (mark == last_mark + 1 && column_position == 0 && last_column == data_part_info_for_read->getColumns().size() - 1);
}
namespace
@ -359,16 +359,16 @@ private:
}
size_t MergeTreeReaderCompact::getReadBufferSize(
const DataPartPtr & part,
const IMergeTreeDataPartInfoForReader & data_part_info_for_reader,
MergeTreeMarksLoader & marks_loader,
const ColumnPositions & column_positions,
const MarkRanges & mark_ranges)
{
size_t buffer_size = 0;
size_t columns_num = column_positions.size();
size_t file_size = part->getFileSizeOrZero(MergeTreeDataPartCompact::DATA_FILE_NAME_WITH_EXTENSION);
size_t file_size = data_part_info_for_reader.getFileSizeOrZero(MergeTreeDataPartCompact::DATA_FILE_NAME_WITH_EXTENSION);
MarksCounter counter(part->getMarksCount(), part->getColumns().size());
MarksCounter counter(data_part_info_for_reader.getMarksCount(), data_part_info_for_reader.getColumns().size());
for (const auto & mark_range : mark_ranges)
{

View File

@ -19,7 +19,7 @@ class MergeTreeReaderCompact : public IMergeTreeReader
{
public:
MergeTreeReaderCompact(
DataPartCompactPtr data_part_,
MergeTreeDataPartInfoForReaderPtr data_part_info_for_read_,
NamesAndTypesList columns_,
const StorageMetadataPtr & metadata_snapshot_,
UncompressedCache * uncompressed_cache_,
@ -67,7 +67,7 @@ private:
/// Returns maximal value of granule size in compressed file from @mark_ranges.
/// This value is used as size of read buffer.
static size_t getReadBufferSize(
const DataPartPtr & part,
const IMergeTreeDataPartInfoForReader & data_part_info_for_reader,
MergeTreeMarksLoader & marks_loader,
const ColumnPositions & column_positions,
const MarkRanges & mark_ranges);

View File

@ -16,13 +16,14 @@ namespace ErrorCodes
MergeTreeReaderInMemory::MergeTreeReaderInMemory(
MergeTreeDataPartInfoForReaderPtr data_part_info_for_read_,
DataPartInMemoryPtr data_part_,
NamesAndTypesList columns_,
const StorageMetadataPtr & metadata_snapshot_,
MarkRanges mark_ranges_,
MergeTreeReaderSettings settings_)
: IMergeTreeReader(
data_part_,
data_part_info_for_read_,
columns_,
metadata_snapshot_,
nullptr,
@ -48,7 +49,7 @@ size_t MergeTreeReaderInMemory::readRows(
if (!continue_reading)
total_rows_read = 0;
size_t total_marks = data_part->index_granularity.getMarksCount();
size_t total_marks = data_part_info_for_read->getIndexGranularity().getMarksCount();
if (from_mark >= total_marks)
throw Exception("Mark " + toString(from_mark) + " is out of bound. Max mark: "
+ toString(total_marks), ErrorCodes::ARGUMENT_OUT_OF_BOUND);

View File

@ -15,6 +15,7 @@ class MergeTreeReaderInMemory : public IMergeTreeReader
{
public:
MergeTreeReaderInMemory(
MergeTreeDataPartInfoForReaderPtr data_part_info_for_read_,
DataPartInMemoryPtr data_part_,
NamesAndTypesList columns_,
const StorageMetadataPtr & metadata_snapshot_,

View File

@ -26,7 +26,7 @@ namespace ErrorCodes
}
MergeTreeReaderWide::MergeTreeReaderWide(
DataPartWidePtr data_part_,
MergeTreeDataPartInfoForReaderPtr data_part_info_,
NamesAndTypesList columns_,
const StorageMetadataPtr & metadata_snapshot_,
UncompressedCache * uncompressed_cache_,
@ -37,7 +37,7 @@ MergeTreeReaderWide::MergeTreeReaderWide(
const ReadBufferFromFileBase::ProfileCallback & profile_callback_,
clockid_t clock_type_)
: IMergeTreeReader(
data_part_,
data_part_info_,
columns_,
metadata_snapshot_,
uncompressed_cache_,
@ -53,7 +53,7 @@ MergeTreeReaderWide::MergeTreeReaderWide(
}
catch (...)
{
storage.reportBrokenPart(data_part);
data_part_info_for_read->reportBroken();
throw;
}
}
@ -73,7 +73,7 @@ size_t MergeTreeReaderWide::readRows(
std::unordered_map<String, ISerialization::SubstreamsCache> caches;
std::unordered_set<std::string> prefetched_streams;
if (data_part->data_part_storage->isStoredOnRemoteDisk() ? settings.read_settings.remote_fs_prefetch : settings.read_settings.local_fs_prefetch)
if (data_part_info_for_read->getDataPartStorage()->isStoredOnRemoteDisk() ? settings.read_settings.remote_fs_prefetch : settings.read_settings.local_fs_prefetch)
{
/// Request reading of data in advance,
/// so if reading can be asynchronous, it will also be performed in parallel for all columns.
@ -136,17 +136,17 @@ size_t MergeTreeReaderWide::readRows(
catch (Exception & e)
{
if (e.code() != ErrorCodes::MEMORY_LIMIT_EXCEEDED)
storage.reportBrokenPart(data_part);
data_part_info_for_read->reportBroken();
/// Better diagnostics.
e.addMessage("(while reading from part " + data_part->data_part_storage->getFullPath() + " "
e.addMessage("(while reading from part " + data_part_info_for_read->getDataPartStorage()->getFullPath() + " "
"from mark " + toString(from_mark) + " "
"with max_rows_to_read = " + toString(max_rows_to_read) + ")");
throw;
}
catch (...)
{
storage.reportBrokenPart(data_part);
data_part_info_for_read->reportBroken();
throw;
}
@ -167,7 +167,7 @@ void MergeTreeReaderWide::addStreams(
if (streams.contains(stream_name))
return;
bool data_file_exists = data_part->checksums.files.contains(stream_name + DATA_FILE_EXTENSION);
bool data_file_exists = data_part_info_for_read->getChecksums().files.contains(stream_name + DATA_FILE_EXTENSION);
/** If data file is missing then we will not try to open it.
* It is necessary since it allows to add new column to structure of the table without creating new files for old parts.
@ -178,10 +178,10 @@ void MergeTreeReaderWide::addStreams(
bool is_lc_dict = substream_path.size() > 1 && substream_path[substream_path.size() - 2].type == ISerialization::Substream::Type::DictionaryKeys;
streams.emplace(stream_name, std::make_unique<MergeTreeReaderStream>(
data_part->data_part_storage, stream_name, DATA_FILE_EXTENSION,
data_part->getMarksCount(), all_mark_ranges, settings, mark_cache,
uncompressed_cache, data_part->getFileSizeOrZero(stream_name + DATA_FILE_EXTENSION),
&data_part->index_granularity_info,
data_part_info_for_read->getDataPartStorage(), stream_name, DATA_FILE_EXTENSION,
data_part_info_for_read->getMarksCount(), all_mark_ranges, settings, mark_cache,
uncompressed_cache, data_part_info_for_read->getFileSizeOrZero(stream_name + DATA_FILE_EXTENSION),
&data_part_info_for_read->getIndexGranularityInfo(),
profile_callback, clock_type, is_lc_dict));
};

View File

@ -15,7 +15,7 @@ class MergeTreeReaderWide : public IMergeTreeReader
{
public:
MergeTreeReaderWide(
DataPartWidePtr data_part_,
MergeTreeDataPartInfoForReaderPtr data_part_info_for_read_,
NamesAndTypesList columns_,
const StorageMetadataPtr & metadata_snapshot_,
UncompressedCache * uncompressed_cache_,

View File

@ -1,6 +1,7 @@
#include <Storages/MergeTree/MergeTreeSelectProcessor.h>
#include <Storages/MergeTree/MergeTreeBaseSelectProcessor.h>
#include <Storages/MergeTree/IMergeTreeReader.h>
#include <Storages/MergeTree/LoadedMergeTreeDataPartInfoForReader.h>
#include <Interpreters/Context.h>
@ -51,7 +52,7 @@ MergeTreeSelectProcessor::MergeTreeSelectProcessor(
void MergeTreeSelectProcessor::initializeReaders()
{
task_columns = getReadTaskColumns(
storage, storage_snapshot, data_part,
LoadedMergeTreeDataPartInfoForReader(data_part), storage_snapshot,
required_columns, virt_column_names, prewhere_info, /*with_subcolumns=*/ true);
/// Will be used to distinguish between PREWHERE and WHERE columns when applying filter

View File

@ -1,5 +1,6 @@
#include <Storages/MergeTree/MergeTreeSequentialSource.h>
#include <Storages/MergeTree/MergeTreeBlockReadUtils.h>
#include <Storages/MergeTree/LoadedMergeTreeDataPartInfoForReader.h>
#include <Processors/Transforms/FilterTransform.h>
#include <QueryPipeline/Pipe.h>
#include <Interpreters/Context.h>
@ -102,7 +103,7 @@ MergeTreeSequentialSource::MergeTreeSequentialSource(
addTotalRowsApprox(data_part->rows_count);
/// Add columns because we don't want to read empty blocks
injectRequiredColumns(storage, storage_snapshot, data_part, /*with_subcolumns=*/ false, columns_to_read);
injectRequiredColumns(LoadedMergeTreeDataPartInfoForReader(data_part), storage_snapshot, /*with_subcolumns=*/ false, columns_to_read);
NamesAndTypesList columns_for_reader;
if (take_column_types_from_storage)

View File

@ -1,6 +1,7 @@
#include "StorageSystemRemoteDataPaths.h"
#include <DataTypes/DataTypeString.h>
#include <DataTypes/DataTypeArray.h>
#include <DataTypes/DataTypesNumber.h>
#include <Interpreters/Cache/FileCache.h>
#include <Interpreters/Cache/FileCacheFactory.h>
#include <Columns/ColumnString.h>
@ -23,6 +24,8 @@ StorageSystemRemoteDataPaths::StorageSystemRemoteDataPaths(const StorageID & tab
{"cache_base_path", std::make_shared<DataTypeString>()},
{"local_path", std::make_shared<DataTypeString>()},
{"remote_path", std::make_shared<DataTypeString>()},
{"size", std::make_shared<DataTypeUInt64>()},
{"common_prefix_for_blobs", std::make_shared<DataTypeString>()},
{"cache_paths", std::make_shared<DataTypeArray>(std::make_shared<DataTypeString>())},
}));
setInMemoryMetadata(storage_metadata);
@ -44,6 +47,8 @@ Pipe StorageSystemRemoteDataPaths::read(
MutableColumnPtr col_cache_base_path = ColumnString::create();
MutableColumnPtr col_local_path = ColumnString::create();
MutableColumnPtr col_remote_path = ColumnString::create();
MutableColumnPtr col_size = ColumnUInt64::create();
MutableColumnPtr col_namespace = ColumnString::create();
MutableColumnPtr col_cache_paths = ColumnArray::create(ColumnString::create());
auto disks = context->getDisksMap();
@ -61,7 +66,7 @@ Pipe StorageSystemRemoteDataPaths::read(
if (!cache_base_path.empty())
cache = FileCacheFactory::instance().get(cache_base_path);
for (const auto & [local_path, storage_objects] : remote_paths_by_local_path)
for (const auto & [local_path, common_prefox_for_objects, storage_objects] : remote_paths_by_local_path)
{
for (const auto & object : storage_objects)
{
@ -70,6 +75,8 @@ Pipe StorageSystemRemoteDataPaths::read(
col_cache_base_path->insert(cache_base_path);
col_local_path->insert(local_path);
col_remote_path->insert(object.absolute_path);
col_size->insert(object.bytes_size);
col_namespace->insert(common_prefox_for_objects);
if (cache)
{
@ -91,6 +98,8 @@ Pipe StorageSystemRemoteDataPaths::read(
res_columns.emplace_back(std::move(col_cache_base_path));
res_columns.emplace_back(std::move(col_local_path));
res_columns.emplace_back(std::move(col_remote_path));
res_columns.emplace_back(std::move(col_size));
res_columns.emplace_back(std::move(col_namespace));
res_columns.emplace_back(std::move(col_cache_paths));
UInt64 num_rows = res_columns.at(0)->size();

View File

@ -1,4 +1,5 @@
-- Tags: no-tsan, no-asan, no-ubsan, no-msan, no-debug, no-cpu-aarch64
-- Tags: no-tsan, no-asan, no-ubsan, no-msan, no-debug, no-cpu-aarch64, disabled
-- Tag disabled: Parsing inlines may lead to "could not find abbreviation code" (FIXME)
SET allow_introspection_functions = 0;
SELECT addressToLineWithInlines(1); -- { serverError 446 }

View File

@ -0,0 +1,14 @@
#!/usr/bin/env bash
CUR_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CUR_DIR"/../shell_config.sh
# NOTE: that this test uses stacktrace instead of addressToLineWithInlines() or
# similar, since that code (use / might use) different code path in Dwarf
# parser.
#
# Also note, that to rely on this test one should assume that CI packages uses
# ThinLTO builds.
$CLICKHOUSE_LOCAL --stacktrace -q 'select throwIf(1)' |& grep -c 'Common/Exception.cpp:[0-9]*: DB::Exception::Exception'