polymorphic parts (development)

This commit is contained in:
CurtizJ 2019-10-10 19:30:30 +03:00
parent 42c9ea9aa3
commit b433add65c
58 changed files with 1498 additions and 894 deletions

View File

@ -36,7 +36,7 @@ TTLBlockInputStream::TTLBlockInputStream(
{
if (force || isTTLExpired(ttl_info.min))
{
new_ttl_infos.columns_ttl.emplace(name, MergeTreeDataPart::TTLInfo{});
new_ttl_infos.columns_ttl.emplace(name,IMergeTreeDataPart::TTLInfo{});
empty_columns.emplace(name);
auto it = column_defaults.find(name);
@ -96,7 +96,7 @@ void TTLBlockInputStream::readSuffixImpl()
new_ttl_infos.updatePartMinMaxTTL(new_ttl_infos.table_ttl.min, new_ttl_infos.table_ttl.max);
data_part->ttl_infos = std::move(new_ttl_infos);
data_part->empty_columns = std::move(empty_columns);
data_part->expired_columns = std::move(empty_columns);
if (rows_removed)
LOG_INFO(log, "Removed " << rows_removed << " rows with expired TTL from part " << data_part->name);

View File

@ -1,7 +1,7 @@
#pragma once
#include <DataStreams/IBlockInputStream.h>
#include <Storages/MergeTree/MergeTreeData.h>
#include <Storages/MergeTree/MergeTreeDataPart.h>
#include <Storages/MergeTree/IMergeTreeDataPart.h>
#include <Core/Block.h>
#include <common/DateLUT.h>
@ -39,8 +39,8 @@ private:
time_t current_time;
bool force;
MergeTreeDataPart::TTLInfos old_ttl_infos;
MergeTreeDataPart::TTLInfos new_ttl_infos;
IMergeTreeDataPart::TTLInfos old_ttl_infos;
IMergeTreeDataPart::TTLInfos new_ttl_infos;
NameSet empty_columns;
size_t rows_removed = 0;

View File

@ -7,7 +7,7 @@
#include <DataTypes/DataTypeDate.h>
#include <DataTypes/DataTypeString.h>
#include <DataTypes/DataTypeEnum.h>
#include <Storages/MergeTree/MergeTreeDataPart.h>
#include <Storages/MergeTree/IMergeTreeDataPart.h>
#include <Storages/MergeTree/MergeTreeData.h>
#include <Interpreters/PartLog.h>

View File

@ -51,7 +51,7 @@ struct PartLogElement
void appendToBlock(Block & block) const;
};
struct MergeTreeDataPart;
class IMergeTreeDataPart;
/// Instead of typedef - to allow forward declaration.
@ -59,7 +59,7 @@ class PartLog : public SystemLog<PartLogElement>
{
using SystemLog<PartLogElement>::SystemLog;
using MutableDataPartPtr = std::shared_ptr<MergeTreeDataPart>;
using MutableDataPartPtr = std::shared_ptr<IMergeTreeDataPart>;
using MutableDataPartsVector = std::vector<MutableDataPartPtr>;
public:

View File

@ -1,5 +1,6 @@
#include <Storages/MergeTree/DataPartsExchange.h>
#include <Storages/IStorage.h>
#include <Storages/MergeTree/MergeTreeDataPartFactory.h>
#include <Common/CurrentMetrics.h>
#include <Common/NetException.h>
#include <Common/typeid_cast.h>
@ -268,8 +269,7 @@ MergeTreeData::MutableDataPartPtr Fetcher::downloadPart(
part_file.createDirectory();
MergeTreeData::MutableDataPartPtr new_data_part = std::make_shared<MergeTreeData::DataPart>(data, reservation->getDisk(), part_name);
new_data_part->relative_path = relative_part_path;
MergeTreeData::MutableDataPartPtr new_data_part = createPart(data, reservation->getDisk(), part_name, relative_part_path);
new_data_part->is_temp = true;

View File

@ -0,0 +1,461 @@
#include "IMergeTreeDataPart.h"
#include <optional>
#include <IO/ReadHelpers.h>
#include <IO/WriteHelpers.h>
#include <Compression/CompressedReadBuffer.h>
#include <Compression/CompressedWriteBuffer.h>
#include <IO/ReadBufferFromString.h>
#include <IO/WriteBufferFromString.h>
#include <IO/ReadBufferFromFile.h>
#include <IO/HashingWriteBuffer.h>
#include <Core/Defines.h>
#include <Common/SipHash.h>
#include <Common/escapeForFileName.h>
#include <Common/StringUtils/StringUtils.h>
#include <Common/localBackup.h>
#include <Compression/CompressionInfo.h>
#include <Storages/MergeTree/MergeTreeData.h>
#include <Poco/File.h>
#include <Poco/Path.h>
#include <Poco/DirectoryIterator.h>
#include <common/logger_useful.h>
#include <common/JSON.h>
namespace DB
{
namespace ErrorCodes
{
extern const int FILE_DOESNT_EXIST;
extern const int NO_FILE_IN_DATA_PART;
extern const int EXPECTED_END_OF_FILE;
extern const int CORRUPTED_DATA;
extern const int NOT_FOUND_EXPECTED_DATA_PART;
extern const int BAD_SIZE_OF_FILE_IN_DATA_PART;
extern const int BAD_TTL_FILE;
extern const int CANNOT_UNLINK;
}
static ReadBufferFromFile openForReading(const String & path)
{
return ReadBufferFromFile(path, std::min(static_cast<Poco::File::FileSize>(DBMS_DEFAULT_BUFFER_SIZE), Poco::File(path).getSize()));
}
void IMergeTreeDataPart::MinMaxIndex::load(const MergeTreeData & data, const String & part_path)
{
size_t minmax_idx_size = data.minmax_idx_column_types.size();
parallelogram.reserve(minmax_idx_size);
for (size_t i = 0; i < minmax_idx_size; ++i)
{
String file_name = part_path + "minmax_" + escapeForFileName(data.minmax_idx_columns[i]) + ".idx";
ReadBufferFromFile file = openForReading(file_name);
const DataTypePtr & type = data.minmax_idx_column_types[i];
Field min_val;
type->deserializeBinary(min_val, file);
Field max_val;
type->deserializeBinary(max_val, file);
parallelogram.emplace_back(min_val, true, max_val, true);
}
initialized = true;
}
void IMergeTreeDataPart::MinMaxIndex::store(const MergeTreeData & data, const String & part_path, Checksums & out_checksums) const
{
store(data.minmax_idx_columns, data.minmax_idx_column_types, part_path, out_checksums);
}
void IMergeTreeDataPart::MinMaxIndex::store(const Names & column_names, const DataTypes & data_types, const String & part_path, Checksums & out_checksums) const
{
if (!initialized)
throw Exception("Attempt to store uninitialized MinMax index for part " + part_path + ". This is a bug.",
ErrorCodes::LOGICAL_ERROR);
for (size_t i = 0; i < column_names.size(); ++i)
{
String file_name = "minmax_" + escapeForFileName(column_names[i]) + ".idx";
const DataTypePtr & type = data_types.at(i);
WriteBufferFromFile out(part_path + file_name);
HashingWriteBuffer out_hashing(out);
type->serializeBinary(parallelogram[i].left, out_hashing);
type->serializeBinary(parallelogram[i].right, out_hashing);
out_hashing.next();
out_checksums.files[file_name].file_size = out_hashing.count();
out_checksums.files[file_name].file_hash = out_hashing.getHash();
}
}
void IMergeTreeDataPart::MinMaxIndex::update(const Block & block, const Names & column_names)
{
if (!initialized)
parallelogram.reserve(column_names.size());
for (size_t i = 0; i < column_names.size(); ++i)
{
Field min_value;
Field max_value;
const ColumnWithTypeAndName & column = block.getByName(column_names[i]);
column.column->getExtremes(min_value, max_value);
if (!initialized)
parallelogram.emplace_back(min_value, true, max_value, true);
else
{
parallelogram[i].left = std::min(parallelogram[i].left, min_value);
parallelogram[i].right = std::max(parallelogram[i].right, max_value);
}
}
initialized = true;
}
void IMergeTreeDataPart::MinMaxIndex::merge(const MinMaxIndex & other)
{
if (!other.initialized)
return;
if (!initialized)
{
parallelogram = other.parallelogram;
initialized = true;
}
else
{
for (size_t i = 0; i < parallelogram.size(); ++i)
{
parallelogram[i].left = std::min(parallelogram[i].left, other.parallelogram[i].left);
parallelogram[i].right = std::max(parallelogram[i].right, other.parallelogram[i].right);
}
}
}
IMergeTreeDataPart::IMergeTreeDataPart(
MergeTreeData & storage_,
const String & name_,
const DiskSpace::DiskPtr & disk_,
const std::optional<String> & relative_path_)
: storage(storage_)
, name(name_)
, info(MergeTreePartInfo::fromPartName(name_, storage.format_version))
, disk(disk_)
, relative_path(relative_path_.value_or(name_))
, index_granularity_info(storage) {}
IMergeTreeDataPart::IMergeTreeDataPart(
const MergeTreeData & storage_,
const String & name_,
const MergeTreePartInfo & info_,
const DiskSpace::DiskPtr & disk_,
const std::optional<String> & relative_path_)
: storage(storage_)
, name(name_)
, info(info_)
, disk(disk_)
, relative_path(relative_path_.value_or(name_))
, index_granularity_info(storage) {}
ColumnSize IMergeTreeDataPart::getColumnSize(const String & column_name, const IDataType & type) const
{
return getColumnSizeImpl(column_name, type, nullptr);
}
ColumnSize IMergeTreeDataPart::getTotalColumnsSize() const
{
ColumnSize totals;
std::unordered_set<String> processed_substreams;
for (const NameAndTypePair & column : columns)
{
ColumnSize size = getColumnSizeImpl(column.name, *column.type, &processed_substreams);
totals.add(size);
}
return totals;
}
String IMergeTreeDataPart::getNewName(const MergeTreePartInfo & new_part_info) const
{
if (storage.format_version < MERGE_TREE_DATA_MIN_FORMAT_VERSION_WITH_CUSTOM_PARTITIONING)
{
/// NOTE: getting min and max dates from the part name (instead of part data) because we want
/// the merged part name be determined only by source part names.
/// It is simpler this way when the real min and max dates for the block range can change
/// (e.g. after an ALTER DELETE command).
DayNum min_date;
DayNum max_date;
MergeTreePartInfo::parseMinMaxDatesFromPartName(name, min_date, max_date);
return new_part_info.getPartNameV0(min_date, max_date);
}
else
return new_part_info.getPartName();
}
DayNum IMergeTreeDataPart::getMinDate() const
{
if (storage.minmax_idx_date_column_pos != -1 && minmax_idx.initialized)
return DayNum(minmax_idx.parallelogram[storage.minmax_idx_date_column_pos].left.get<UInt64>());
else
return DayNum();
}
DayNum IMergeTreeDataPart::getMaxDate() const
{
if (storage.minmax_idx_date_column_pos != -1 && minmax_idx.initialized)
return DayNum(minmax_idx.parallelogram[storage.minmax_idx_date_column_pos].right.get<UInt64>());
else
return DayNum();
}
time_t IMergeTreeDataPart::getMinTime() const
{
if (storage.minmax_idx_time_column_pos != -1 && minmax_idx.initialized)
return minmax_idx.parallelogram[storage.minmax_idx_time_column_pos].left.get<UInt64>();
else
return 0;
}
time_t IMergeTreeDataPart::getMaxTime() const
{
if (storage.minmax_idx_time_column_pos != -1 && minmax_idx.initialized)
return minmax_idx.parallelogram[storage.minmax_idx_time_column_pos].right.get<UInt64>();
else
return 0;
}
IMergeTreeDataPart::~IMergeTreeDataPart()
{
// if (on_disk && (state == State::DeleteOnDestroy || is_temp))
// {
// try
// {
// std::string path = on_disk->getFullPath();
// Poco::File dir(path);
// if (!dir.exists())
// return;
// if (is_temp)
// {
// if (!startsWith(on_disk->getNameWithPrefix(), "tmp"))
// {
// LOG_ERROR(storage.log, "~DataPart() should remove part " << path
// << " but its name doesn't start with tmp. Too suspicious, keeping the part.");
// return;
// }
// }
// dir.remove(true);
// }
// catch (...)
// {
// tryLogCurrentException(__PRETTY_FUNCTION__);
// }
// }
}
UInt64 IMergeTreeDataPart::getIndexSizeInBytes() const
{
UInt64 res = 0;
for (const ColumnPtr & column : index)
res += column->byteSize();
return res;
}
UInt64 IMergeTreeDataPart::getIndexSizeInAllocatedBytes() const
{
UInt64 res = 0;
for (const ColumnPtr & column : index)
res += column->allocatedBytes();
return res;
}
String IMergeTreeDataPart::stateToString(IMergeTreeDataPart::State state)
{
switch (state)
{
case State::Temporary:
return "Temporary";
case State::PreCommitted:
return "PreCommitted";
case State::Committed:
return "Committed";
case State::Outdated:
return "Outdated";
case State::Deleting:
return "Deleting";
case State::DeleteOnDestroy:
return "DeleteOnDestroy";
}
__builtin_unreachable();
}
String IMergeTreeDataPart::stateString() const
{
return stateToString(state);
}
void IMergeTreeDataPart::assertState(const std::initializer_list<IMergeTreeDataPart::State> & affordable_states) const
{
if (!checkState(affordable_states))
{
String states_str;
for (auto affordable_state : affordable_states)
states_str += stateToString(affordable_state) + " ";
throw Exception("Unexpected state of part " + getNameWithState() + ". Expected: " + states_str, ErrorCodes::NOT_FOUND_EXPECTED_DATA_PART);
}
}
void IMergeTreeDataPart::assertOnDisk() const
{
if (!isStoredOnDisk())
throw Exception("Data part '" + name + "is not stored on disk", ErrorCodes::LOGICAL_ERROR);
}
UInt64 IMergeTreeDataPart::getMarksCount() const
{
return index_granularity.getMarksCount();
}
size_t IMergeTreeDataPart::getFileSizeOrZero(const String & file_name) const
{
auto checksum = checksums.files.find(file_name);
if (checksum == checksums.files.end())
return 0;
return checksum->second.file_size;
}
String IMergeTreeDataPart::getFullPath() const
{
assertOnDisk();
if (relative_path.empty())
throw Exception("Part relative_path cannot be empty. It's bug.", ErrorCodes::LOGICAL_ERROR);
return storage.getFullPathOnDisk(disk) + relative_path + "/";
}
UInt64 IMergeTreeDataPart::calculateTotalSizeOnDisk(const String & from)
{
Poco::File cur(from);
if (cur.isFile())
return cur.getSize();
std::vector<std::string> files;
cur.list(files);
UInt64 res = 0;
for (const auto & file : files)
res += calculateTotalSizeOnDisk(from + file);
return res;
}
void IMergeTreeDataPart::renameTo(const String & new_relative_path, bool remove_new_dir_if_exists) const
{
String from = getFullPath();
String to = storage.getFullPathOnDisk(disk) + new_relative_path + "/";
Poco::File from_file(from);
if (!from_file.exists())
throw Exception("Part directory " + from + " doesn't exist. Most likely it is logical error.", ErrorCodes::FILE_DOESNT_EXIST);
Poco::File to_file(to);
if (to_file.exists())
{
if (remove_new_dir_if_exists)
{
Names files;
Poco::File(from).list(files);
LOG_WARNING(storage.log, "Part directory " << to << " already exists"
<< " and contains " << files.size() << " files. Removing it.");
to_file.remove(true);
}
else
{
throw Exception("Part directory " + to + " already exists", ErrorCodes::DIRECTORY_ALREADY_EXISTS);
}
}
from_file.setLastModified(Poco::Timestamp::fromEpochTime(time(nullptr)));
from_file.renameTo(to);
relative_path = new_relative_path;
}
String IMergeTreeDataPart::getRelativePathForDetachedPart(const String & prefix) const
{
/// Do not allow underscores in the prefix because they are used as separators.
assert(prefix.find_first_of('_') == String::npos);
String res;
/** If you need to detach a part, and directory into which we want to rename it already exists,
* we will rename to the directory with the name to which the suffix is added in the form of "_tryN".
* This is done only in the case of `to_detached`, because it is assumed that in this case the exact name does not matter.
* No more than 10 attempts are made so that there are not too many junk directories left.
*/
for (int try_no = 0; try_no < 10; try_no++)
{
res = "detached/" + (prefix.empty() ? "" : prefix + "_")
+ name + (try_no ? "_try" + DB::toString(try_no) : "");
if (!Poco::File(storage.getFullPathOnDisk(disk) + res).exists())
return res;
LOG_WARNING(storage.log, "Directory " << res << " (to detach to) already exists."
" Will detach to directory with '_tryN' suffix.");
}
return res;
}
void IMergeTreeDataPart::renameToDetached(const String & prefix) const
{
assertOnDisk();
renameTo(getRelativePathForDetachedPart(prefix));
}
void IMergeTreeDataPart::makeCloneInDetached(const String & prefix) const
{
assertOnDisk();
LOG_INFO(storage.log, "Detaching " << relative_path);
Poco::Path src(getFullPath());
Poco::Path dst(storage.getFullPathOnDisk(disk) + getRelativePathForDetachedPart(prefix));
/// Backup is not recursive (max_level is 0), so do not copy inner directories
localBackup(src, dst, 0);
}
void IMergeTreeDataPart::makeCloneOnDiskDetached(const DiskSpace::ReservationPtr & reservation) const
{
assertOnDisk();
auto & reserved_disk = reservation->getDisk();
if (reserved_disk->getName() == disk->getName())
throw Exception("Can not clone data part " + name + " to same disk " + disk->getName(), ErrorCodes::LOGICAL_ERROR);
String path_to_clone = storage.getFullPathOnDisk(reserved_disk) + "detached/";
if (Poco::File(path_to_clone + relative_path).exists())
throw Exception("Path " + path_to_clone + relative_path + " already exists. Can not clone ", ErrorCodes::DIRECTORY_ALREADY_EXISTS);
Poco::File(path_to_clone).createDirectory();
Poco::File cloning_directory(getFullPath());
cloning_directory.copyTo(path_to_clone);
}
}

View File

@ -1,9 +1,12 @@
#pragma once
#include <DataStreams/IBlockInputStream.h>
#include <Core/Row.h>
#include <Core/Block.h>
#include <Core/Types.h>
#include <Core/NamesAndTypes.h>
#include <Storages/IStorage.h>
#include <Storages/MergeTree/MergeTreeIndexGranularity.h>
#include <Storages/MergeTree/MergeTreeIndexGranularityInfo.h>
#include <Storages/MergeTree/MergeTreeIndices.h>
@ -12,54 +15,146 @@
#include <Storages/MergeTree/MergeTreeDataPartChecksum.h>
#include <Storages/MergeTree/MergeTreeDataPartTTLInfo.h>
#include <Storages/MergeTree/KeyCondition.h>
// #include <Storages/MergeTree/IMergeTreeReader.h>
// #include <Storages/MergeTree/MergeTreeWriter.h>
// #include <Storages/MergeTree/MergeTreeDataPartWide.h>
#include <Columns/IColumn.h>
#include <Storages/MergeTree/MergeTreeReaderSettings.h>
#include <Poco/Path.h>
#include <shared_mutex>
namespace DB
{
struct ColumnSize;
class MergeTreeData;
struct ColumnSize;
class MergeTreeData;
class IMergeTreeReader;
/// Description of the data part.
struct MergeTreeDataPart
namespace ErrorCodes
{
extern const int NOT_IMPLEMETED;
}
// class MergeTreeDataPartOnDisk;
class IMergeTreeDataPart : public std::enable_shared_from_this<IMergeTreeDataPart>
{
public:
using Checksums = MergeTreeDataPartChecksums;
using Checksum = MergeTreeDataPartChecksums::Checksum;
using MergeTreeReaderPtr = std::unique_ptr<IMergeTreeReader>;
using ValueSizeMap = std::map<std::string, double>;
MergeTreeDataPart(const MergeTreeData & storage_, const DiskSpace::DiskPtr & disk_, const String & name_, const MergeTreePartInfo & info_);
// virtual BlockInputStreamPtr readAll() = 0;
// virtual BlockInputStreamPtr read() = 0;
// virtual BlockInputStreamPtr readWithThreadPool() = 0;
// virtual BlockInputStreamPtr readReverse() = 0;
MergeTreeDataPart(MergeTreeData & storage_, const DiskSpace::DiskPtr & disk_, const String & name_);
virtual MergeTreeReaderPtr getReader(
const NamesAndTypesList & columns_,
const MarkRanges & mark_ranges,
UncompressedCache * uncompressed_cache,
MarkCache * mark_cache,
const ReaderSettings & reader_settings_,
const ValueSizeMap & avg_value_size_hints_ = ValueSizeMap{},
const ReadBufferFromFileBase::ProfileCallback & profile_callback_ = ReadBufferFromFileBase::ProfileCallback{}) const = 0;
// virtual MergeTreeWriterPtr getWriter() const = 0;
/// Returns the name of a column with minimum compressed size (as returned by getColumnSize()).
/// If no checksums are present returns the name of the first physically existing column.
String getColumnNameWithMinumumCompressedSize() const;
virtual bool isStoredOnDisk() const = 0;
virtual void remove() const = 0;
virtual bool supportsVerticalMerge() const { return false; }
/// NOTE: Returns zeros if column files are not found in checksums.
/// NOTE: You must ensure that no ALTERs are in progress when calculating ColumnSizes.
/// (either by locking columns_lock, or by locking table structure).
ColumnSize getColumnSize(const String & name, const IDataType & type) const;
virtual ColumnSize getColumnSize(const String & name, const IDataType & type) const = 0;
/// Initialize columns (from columns.txt if exists, or create from column files if not).
/// Load checksums from checksums.txt if exists. Load index if required.
virtual void loadColumnsChecksumsIndexes(bool require_columns_checksums, bool check_consistency) = 0;
/// Returns the name of a column with minimum compressed size (as returned by getColumnSize()).
/// If no checksums are present returns the name of the first physically existing column.
virtual String getColumnNameWithMinumumCompressedSize() const = 0;
// virtual void detach() = 0;
// virtual Checksums check(
// bool require_checksums,
// const DataTypes & primary_key_data_types, /// Check the primary key. If it is not necessary, pass an empty array.
// const MergeTreeIndices & indices = {}, /// Check skip indices
// std::function<bool()> is_cancelled = []{ return false; })
// {
// return {};
// }
using ColumnToSize = std::map<std::string, UInt64>;
// void accumulateColumnSizes(ColumnToSize & column_to_size) const
// {
// throw Exception("Method 'accumulateColumnSizes' is not supported for data part with type " + typeToString(getType()), ErrorCodes::NOT_IMPLEMETED);
// }
enum class Type
{
WIDE,
STRIPED,
IN_MEMORY,
};
virtual Type getType() const = 0;
// virtual void renameTo() = 0;
static String typeToString(Type type)
{
switch(type)
{
case Type::WIDE:
return "Wide";
case Type::STRIPED:
return "Striped";
case Type::IN_MEMORY:
return "InMemory";
}
__builtin_unreachable();
}
String getTypeName() { return typeToString(getType()); }
virtual ~IMergeTreeDataPart();
IMergeTreeDataPart(
const MergeTreeData & storage_,
const String & name_,
const MergeTreePartInfo & info_,
const DiskSpace::DiskPtr & disk = {},
const std::optional<String> & relative_path = {});
IMergeTreeDataPart(
MergeTreeData & storage_,
const String & name_,
const DiskSpace::DiskPtr & disk = {},
const std::optional<String> & relative_path = {});
void assertOnDisk() const;
ColumnSize getTotalColumnsSize() const;
size_t getFileSizeOrZero(const String & file_name) const;
/// Returns full path to part dir
String getFullPath() const;
/// Returns part->name with prefixes like 'tmp_<name>'
String getNameWithPrefix() const;
/// Generate the new name for this part according to `new_part_info` and min/max dates from the old name.
/// This is useful when you want to change e.g. block numbers or the mutation version of the part.
String getNewName(const MergeTreePartInfo & new_part_info) const;
bool contains(const MergeTreeDataPart & other) const { return info.contains(other.info); }
bool contains(const IMergeTreeDataPart & other) const { return info.contains(other.info); }
/// If the partition key includes date column (a common case), these functions will return min and max values for this column.
DayNum getMinDate() const;
@ -73,18 +168,19 @@ struct MergeTreeDataPart
const MergeTreeData & storage;
DiskSpace::DiskPtr disk;
String name;
MergeTreePartInfo info;
/// A directory path (relative to storage's path) where part data is actually stored
/// Examples: 'detached/tmp_fetch_<name>', 'tmp_<name>', '<name>'
DiskSpace::DiskPtr disk;
mutable String relative_path;
size_t rows_count = 0;
std::atomic<UInt64> bytes_on_disk {0}; /// 0 - if not counted;
/// Is used from several threads without locks (it is changed with ALTER).
/// May not contain size of checksums.txt and columns.txt
time_t modification_time = 0;
/// When the part is removed from the working set. Changes once.
mutable std::atomic<time_t> remove_time { std::numeric_limits<time_t>::max() };
@ -152,24 +248,6 @@ struct MergeTreeDataPart
/// Throws an exception if state of the part is not in affordable_states
void assertState(const std::initializer_list<State> & affordable_states) const;
/// In comparison with lambdas, it is move assignable and could has several overloaded operator()
struct StatesFilter
{
std::initializer_list<State> affordable_states;
StatesFilter(const std::initializer_list<State> & affordable_states_) : affordable_states(affordable_states_) {}
bool operator() (const std::shared_ptr<const MergeTreeDataPart> & part) const
{
return part->checkState(affordable_states);
}
};
/// Returns a lambda that returns true only for part with states from specified list
static inline StatesFilter getStatesFilter(const std::initializer_list<State> & affordable_states)
{
return StatesFilter(affordable_states);
}
/// Primary key (correspond to primary.idx file).
/// Always loaded in RAM. Contains each index_granularity-th value of primary key tuple.
/// Note that marks (also correspond to primary key) is not always in RAM, but cached. See MarkCache.h.
@ -218,9 +296,7 @@ struct MergeTreeDataPart
NamesAndTypesList columns;
/// Columns with values, that all have been zeroed by expired ttl
NameSet empty_columns;
using ColumnToSize = std::map<std::string, UInt64>;
NameSet expired_columns;
/** It is blocked for writing when changing columns, checksums or any part files.
* Locked to read when reading columns, checksums or any part files.
@ -239,72 +315,31 @@ struct MergeTreeDataPart
MergeTreeIndexGranularityInfo index_granularity_info;
~MergeTreeDataPart();
/// Calculate the total size of the entire directory with all the files
static UInt64 calculateTotalSizeOnDisk(const String & from);
void remove() const;
/// Makes checks and move part to new directory
/// Changes only relative_dir_name, you need to update other metadata (name, is_temp) explicitly
void renameTo(const String & new_relative_path, bool remove_new_dir_if_exists = true) const;
/// Generate unique path to detach part
String getRelativePathForDetachedPart(const String & prefix) const;
/// Moves a part to detached/ directory and adds prefix to its name
void renameToDetached(const String & prefix) const;
/// Makes clone of a part in detached/ directory via hard links
void makeCloneInDetached(const String & prefix) const;
/// Makes full clone of part in detached/ on another disk
void makeCloneOnDiskDetached(const DiskSpace::ReservationPtr & reservation) const;
/// Populates columns_to_size map (compressed size).
void accumulateColumnSizes(ColumnToSize & column_to_size) const;
/// Initialize columns (from columns.txt if exists, or create from column files if not).
/// Load checksums from checksums.txt if exists. Load index if required.
void loadColumnsChecksumsIndexes(bool require_columns_checksums, bool check_consistency);
/// Checks that .bin and .mrk files exist
bool hasColumnFiles(const String & column, const IDataType & type) const;
/// For data in RAM ('index')
UInt64 getIndexSizeInBytes() const;
UInt64 getIndexSizeInAllocatedBytes() const;
UInt64 getMarksCount() const;
size_t getFileSizeOrZero(const String & file_name) const;
String getFullPath() const;
void renameTo(const String & new_relative_path, bool remove_new_dir_if_exists = false) const;
void renameToDetached(const String & prefix) const;
void makeCloneInDetached(const String & prefix) const;
void makeCloneOnDiskDetached(const DiskSpace::ReservationPtr & reservation) const;
/// Checks that .bin and .mrk files exist
bool hasColumnFiles(const String & /* column */, const IDataType & /* type */ ) const { return true; }
static UInt64 calculateTotalSizeOnDisk(const String & from);
private:
/// Reads columns names and types from columns.txt
void loadColumns(bool require);
/// If checksums.txt exists, reads files' checksums (and sizes) from it
void loadChecksums(bool require);
/// Loads marks index granularity into memory
void loadIndexGranularity();
/// Loads index file.
void loadIndex();
/// Load rows count for this part from disk (for the newer storage format version).
/// For the older format version calculates rows count from the size of a column with a fixed size.
void loadRowsCount();
/// Loads ttl infos in json format from file ttl.txt. If file doesn`t exists assigns ttl infos with all zeros
void loadTTLInfos();
void loadPartitionAndMinMaxIndex();
String getRelativePathForDetachedPart(const String & prefix) const;
void checkConsistency(bool require_part_metadata);
ColumnSize getColumnSizeImpl(const String & name, const IDataType & type, std::unordered_set<String> * processed_substreams) const;
virtual ColumnSize getColumnSizeImpl(const String & name, const IDataType & type, std::unordered_set<String> * processed_substreams) const = 0;
};
using MergeTreeDataPartState = MergeTreeDataPart::State;
using MergeTreeDataPartState = IMergeTreeDataPart::State;
}

View File

@ -0,0 +1,191 @@
#include <DataTypes/NestedUtils.h>
#include <DataTypes/DataTypeArray.h>
#include <Common/escapeForFileName.h>
#include <Compression/CachedCompressedReadBuffer.h>
#include <Columns/ColumnArray.h>
#include <Interpreters/evaluateMissingDefaults.h>
#include <Storages/MergeTree/IMergeTreeReader.h>
#include <Common/typeid_cast.h>
#include <Poco/File.h>
namespace DB
{
namespace
{
using OffsetColumns = std::map<std::string, ColumnPtr>;
}
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
extern const int NOT_FOUND_EXPECTED_DATA_PART;
extern const int MEMORY_LIMIT_EXCEEDED;
extern const int ARGUMENT_OUT_OF_BOUND;
}
IMergeTreeReader::IMergeTreeReader(const MergeTreeData::DataPartPtr & data_part_,
const NamesAndTypesList & columns_, UncompressedCache * uncompressed_cache_, MarkCache * mark_cache_,
const MarkRanges & all_mark_ranges_, const ReaderSettings & settings_,
const ValueSizeMap & avg_value_size_hints_)
: data_part(data_part_), avg_value_size_hints(avg_value_size_hints_), path(data_part_->getFullPath())
, columns(columns_), uncompressed_cache(uncompressed_cache_), mark_cache(mark_cache_)
, settings(settings_), storage(data_part_->storage)
, all_mark_ranges(all_mark_ranges_)
{
}
IMergeTreeReader::~IMergeTreeReader() = default;
const IMergeTreeReader::ValueSizeMap & IMergeTreeReader::getAvgValueSizeHints() const
{
return avg_value_size_hints;
}
static bool arrayHasNoElementsRead(const IColumn & column)
{
const ColumnArray * column_array = typeid_cast<const ColumnArray *>(&column);
if (!column_array)
return false;
size_t size = column_array->size();
if (!size)
return false;
size_t data_size = column_array->getData().size();
if (data_size)
return false;
size_t last_offset = column_array->getOffsets()[size - 1];
return last_offset != 0;
}
void IMergeTreeReader::fillMissingColumns(Block & res, bool & should_reorder, bool & should_evaluate_missing_defaults, size_t num_rows)
{
try
{
/// For a missing column of a nested data structure we must create not a column of empty
/// arrays, but a column of arrays of correct length.
/// First, collect offset columns for all arrays in the block.
OffsetColumns offset_columns;
for (size_t i = 0; i < res.columns(); ++i)
{
const ColumnWithTypeAndName & column = res.safeGetByPosition(i);
if (const ColumnArray * array = typeid_cast<const ColumnArray *>(column.column.get()))
{
String offsets_name = Nested::extractTableName(column.name);
auto & offsets_column = offset_columns[offsets_name];
/// If for some reason multiple offsets columns are present for the same nested data structure,
/// choose the one that is not empty.
if (!offsets_column || offsets_column->empty())
offsets_column = array->getOffsetsPtr();
}
}
should_evaluate_missing_defaults = false;
should_reorder = false;
/// insert default values only for columns without default expressions
for (const auto & requested_column : columns)
{
bool has_column = res.has(requested_column.name);
if (has_column)
{
const auto & col = *res.getByName(requested_column.name).column;
if (arrayHasNoElementsRead(col))
{
res.erase(requested_column.name);
has_column = false;
}
}
if (!has_column)
{
should_reorder = true;
if (storage.getColumns().hasDefault(requested_column.name))
{
should_evaluate_missing_defaults = true;
continue;
}
ColumnWithTypeAndName column_to_add;
column_to_add.name = requested_column.name;
column_to_add.type = requested_column.type;
String offsets_name = Nested::extractTableName(column_to_add.name);
if (offset_columns.count(offsets_name))
{
ColumnPtr offsets_column = offset_columns[offsets_name];
DataTypePtr nested_type = typeid_cast<const DataTypeArray &>(*column_to_add.type).getNestedType();
size_t nested_rows = typeid_cast<const ColumnUInt64 &>(*offsets_column).getData().back();
ColumnPtr nested_column = nested_type->createColumnConstWithDefaultValue(nested_rows)->convertToFullColumnIfConst();
column_to_add.column = ColumnArray::create(nested_column, offsets_column);
}
else
{
/// We must turn a constant column into a full column because the interpreter could infer that it is constant everywhere
/// but in some blocks (from other parts) it can be a full column.
column_to_add.column = column_to_add.type->createColumnConstWithDefaultValue(num_rows)->convertToFullColumnIfConst();
}
res.insert(std::move(column_to_add));
}
}
}
catch (Exception & e)
{
/// Better diagnostics.
e.addMessage("(while reading from part " + path + ")");
throw;
}
}
void IMergeTreeReader::reorderColumns(Block & res, const Names & ordered_names, const String * filter_name)
{
try
{
Block ordered_block;
for (const auto & name : ordered_names)
if (res.has(name))
ordered_block.insert(res.getByName(name));
if (filter_name && !ordered_block.has(*filter_name) && res.has(*filter_name))
ordered_block.insert(res.getByName(*filter_name));
std::swap(res, ordered_block);
}
catch (Exception & e)
{
/// Better diagnostics.
e.addMessage("(while reading from part " + path + ")");
throw;
}
}
void IMergeTreeReader::evaluateMissingDefaults(Block & res)
{
try
{
DB::evaluateMissingDefaults(res, columns, storage.getColumns().getDefaults(), storage.global_context);
}
catch (Exception & e)
{
/// Better diagnostics.
e.addMessage("(while reading from part " + path + ")");
throw;
}
}
}

View File

@ -2,7 +2,7 @@
#include <Core/NamesAndTypes.h>
#include <Storages/MergeTree/MergeTreeReaderStream.h>
#include <port/clock.h>
#include <Storages/MergeTree/MergeTreeBlockReadUtils.h>
namespace DB
@ -13,24 +13,25 @@ class IDataType;
/// Reads the data between pairs of marks in the same part. When reading consecutive ranges, avoids unnecessary seeks.
/// When ranges are almost consecutive, seeks are fast because they are performed inside the buffer.
/// Avoids loading the marks file if it is not needed (e.g. when reading the whole part).
class MergeTreeReader : private boost::noncopyable
class IMergeTreeReader : private boost::noncopyable
{
public:
using ValueSizeMap = std::map<std::string, double>;
using DeserializeBinaryBulkStateMap = std::map<std::string, IDataType::DeserializeBinaryBulkStatePtr>;
MergeTreeReader(const String & path_, /// Path to the directory containing the part
const MergeTreeData::DataPartPtr & data_part_, const NamesAndTypesList & columns_,
IMergeTreeReader(const MergeTreeData::DataPartPtr & data_part_,
const NamesAndTypesList & columns_,
UncompressedCache * uncompressed_cache_,
MarkCache * mark_cache_,
bool save_marks_in_cache_,
const MergeTreeData & storage_, const MarkRanges & all_mark_ranges_,
size_t aio_threshold_, size_t max_read_buffer_size_,
const ValueSizeMap & avg_value_size_hints_ = ValueSizeMap{},
const ReadBufferFromFileBase::ProfileCallback & profile_callback_ = ReadBufferFromFileBase::ProfileCallback{},
clockid_t clock_type_ = CLOCK_MONOTONIC_COARSE);
const MarkRanges & all_mark_ranges_,
const ReaderSettings & settings_,
const ValueSizeMap & avg_value_size_hints_ = ValueSizeMap{});
~MergeTreeReader();
/// Return the number of rows has been read or zero if there is no columns to read.
/// If continue_reading is true, continue reading from last state, otherwise seek to from_mark
virtual size_t readRows(size_t from_mark, bool continue_reading, size_t max_rows_to_read, Block & res) = 0;
virtual ~IMergeTreeReader();
const ValueSizeMap & getAvgValueSizeHints() const;
@ -47,19 +48,14 @@ public:
const NamesAndTypesList & getColumns() const { return columns; }
/// Return the number of rows has been read or zero if there is no columns to read.
/// If continue_reading is true, continue reading from last state, otherwise seek to from_mark
size_t readRows(size_t from_mark, bool continue_reading, size_t max_rows_to_read, Block & res);
MergeTreeData::DataPartPtr data_part;
size_t getFirstMarkToRead() const
{
return all_mark_ranges.back().begin;
}
private:
using FileStreams = std::map<std::string, std::unique_ptr<MergeTreeReaderStream>>;
protected:
/// avg_value_size_hints are used to reduce the number of reallocations when creating columns of variable size.
ValueSizeMap avg_value_size_hints;
/// Stores states for IDataType::deserializeBinaryBulk
@ -67,28 +63,17 @@ private:
/// Path to the directory containing the part
String path;
FileStreams streams;
/// Columns that are read.
NamesAndTypesList columns;
UncompressedCache * uncompressed_cache;
MarkCache * mark_cache;
/// If save_marks_in_cache is false, then, if marks are not in cache, we will load them but won't save in the cache, to avoid evicting other data.
bool save_marks_in_cache;
ReaderSettings settings;
const MergeTreeData & storage;
MarkRanges all_mark_ranges;
size_t aio_threshold;
size_t max_read_buffer_size;
void addStreams(const String & name, const IDataType & type,
const ReadBufferFromFileBase::ProfileCallback & profile_callback, clockid_t clock_type);
void readData(
const String & name, const IDataType & type, IColumn & column,
size_t from_mark, bool continue_reading, size_t max_rows_to_read,
bool read_offsets = true);
friend class MergeTreeRangeReader::DelayedStream;

View File

@ -1,6 +1,7 @@
#include <Storages/MergeTree/MergeTreeBaseSelectBlockInputStream.h>
#include <Storages/MergeTree/MergeTreeRangeReader.h>
#include <Storages/MergeTree/MergeTreeReader.h>
#include <Storages/MergeTree/IMergeTreeDataPart.h>
#include <Storages/MergeTree/IMergeTreeReader.h>
#include <Storages/MergeTree/MergeTreeBlockReadUtils.h>
#include <Columns/FilterDescription.h>
#include <Columns/ColumnArray.h>
@ -25,10 +26,8 @@ MergeTreeBaseSelectBlockInputStream::MergeTreeBaseSelectBlockInputStream(
UInt64 max_block_size_rows_,
UInt64 preferred_block_size_bytes_,
UInt64 preferred_max_column_in_block_size_bytes_,
UInt64 min_bytes_to_use_direct_io_,
UInt64 max_read_buffer_size_,
const ReaderSettings & reader_settings_,
bool use_uncompressed_cache_,
bool save_marks_in_cache_,
const Names & virt_column_names_)
:
storage(storage_),
@ -36,10 +35,8 @@ MergeTreeBaseSelectBlockInputStream::MergeTreeBaseSelectBlockInputStream(
max_block_size_rows(max_block_size_rows_),
preferred_block_size_bytes(preferred_block_size_bytes_),
preferred_max_column_in_block_size_bytes(preferred_max_column_in_block_size_bytes_),
min_bytes_to_use_direct_io(min_bytes_to_use_direct_io_),
max_read_buffer_size(max_read_buffer_size_),
reader_settings(reader_settings_),
use_uncompressed_cache(use_uncompressed_cache_),
save_marks_in_cache(save_marks_in_cache_),
virt_column_names(virt_column_names_)
{
}

View File

@ -8,7 +8,7 @@
namespace DB
{
class MergeTreeReader;
class IMergeTreeReader;
class UncompressedCache;
class MarkCache;
@ -23,10 +23,8 @@ public:
UInt64 max_block_size_rows_,
UInt64 preferred_block_size_bytes_,
UInt64 preferred_max_column_in_block_size_bytes_,
UInt64 min_bytes_to_use_direct_io_,
UInt64 max_read_buffer_size_,
const ReaderSettings & reader_settings_,
bool use_uncompressed_cache_,
bool save_marks_in_cache_ = true,
const Names & virt_column_names_ = {});
~MergeTreeBaseSelectBlockInputStream() override;
@ -61,11 +59,9 @@ protected:
UInt64 preferred_block_size_bytes;
UInt64 preferred_max_column_in_block_size_bytes;
UInt64 min_bytes_to_use_direct_io;
UInt64 max_read_buffer_size;
ReaderSettings reader_settings;
bool use_uncompressed_cache;
bool save_marks_in_cache;
Names virt_column_names;
@ -74,7 +70,7 @@ protected:
std::shared_ptr<UncompressedCache> owned_uncompressed_cache;
std::shared_ptr<MarkCache> owned_mark_cache;
using MergeTreeReaderPtr = std::unique_ptr<MergeTreeReader>;
using MergeTreeReaderPtr = std::unique_ptr<IMergeTreeReader>;
MergeTreeReaderPtr reader;
MergeTreeReaderPtr pre_reader;
};

View File

@ -4,6 +4,7 @@
#include <Core/NamesAndTypes.h>
#include <Storages/MergeTree/RangesInDataPart.h>
#include <Storages/MergeTree/MergeTreeRangeReader.h>
#include <Storages/MergeTree/MergeTreeReaderSettings.h>
namespace DB

View File

@ -5,6 +5,7 @@
#include <Storages/MergeTree/MergedBlockOutputStream.h>
#include <Storages/MergeTree/MergedColumnOnlyOutputStream.h>
#include <Storages/MergeTree/checkDataPart.h>
#include <Storages/MergeTree/MergeTreeDataPartFactory.h>
#include <Storages/StorageMergeTree.h>
#include <Storages/StorageReplicatedMergeTree.h>
#include <Storages/AlterCommands.h>
@ -810,8 +811,7 @@ void MergeTreeData::loadDataParts(bool skip_sanity_checks)
if (!MergeTreePartInfo::tryParsePartName(part_name, &part_info, format_version))
return;
MutableDataPartPtr part = std::make_shared<DataPart>(*this, part_disk_ptr, part_name, part_info);
part->relative_path = part_name;
MutableDataPartPtr part = createPart(*this, part_disk_ptr, part_name, part_info, part_name);
bool broken = false;
try
@ -1738,7 +1738,7 @@ void MergeTreeData::checkSettingCanBeChanged(const String & setting_name) const
void MergeTreeData::removeEmptyColumnsFromPart(MergeTreeData::MutableDataPartPtr & data_part)
{
auto & empty_columns = data_part->empty_columns;
auto & empty_columns = data_part->expired_columns;
if (empty_columns.empty())
return;
@ -2126,14 +2126,14 @@ void MergeTreeData::removePartsFromWorkingSet(const MergeTreeData::DataPartsVect
for (const DataPartPtr & part : remove)
{
if (part->state == MergeTreeDataPart::State::Committed)
if (part->state ==IMergeTreeDataPart::State::Committed)
removePartContributionToColumnSizes(part);
if (part->state == MergeTreeDataPart::State::Committed || clear_without_timeout)
if (part->state ==IMergeTreeDataPart::State::Committed || clear_without_timeout)
part->remove_time.store(remove_time, std::memory_order_relaxed);
if (part->state != MergeTreeDataPart::State::Outdated)
modifyPartState(part, MergeTreeDataPart::State::Outdated);
if (part->state !=IMergeTreeDataPart::State::Outdated)
modifyPartState(part,IMergeTreeDataPart::State::Outdated);
}
}
@ -2583,8 +2583,7 @@ MergeTreeData::DataPartPtr MergeTreeData::getPartIfExists(const String & part_na
MergeTreeData::MutableDataPartPtr MergeTreeData::loadPartAndFixMetadata(const DiskSpace::DiskPtr & disk, const String & relative_path)
{
MutableDataPartPtr part = std::make_shared<DataPart>(*this, disk, Poco::Path(relative_path).getFileName());
part->relative_path = relative_path;
MutableDataPartPtr part = createPart(*this, disk, Poco::Path(relative_path).getFileName(), relative_path);
loadPartAndFixMetadata(part);
return part;
}
@ -3022,8 +3021,8 @@ MergeTreeData::MutableDataPartsVector MergeTreeData::tryLoadPartsToAttach(const
for (const auto & part_names : renamed_parts.old_and_new_names)
{
LOG_DEBUG(log, "Checking part " << part_names.second);
MutableDataPartPtr part = std::make_shared<DataPart>(*this, name_to_disk[part_names.first], part_names.first);
part->relative_path = source_dir + part_names.second;
MutableDataPartPtr part = createPart(
*this, name_to_disk[part_names.first], part_names.first, source_dir + part_names.second);
loadPartAndFixMetadata(part);
loaded_parts.push_back(part);
}
@ -3235,10 +3234,9 @@ MergeTreeData::MutableDataPartPtr MergeTreeData::cloneAndLoadDataPart(const Merg
LOG_DEBUG(log, "Cloning part " << src_part_absolute_path.toString() << " to " << dst_part_absolute_path.toString());
localBackup(src_part_absolute_path, dst_part_absolute_path);
MergeTreeData::MutableDataPartPtr dst_data_part = std::make_shared<MergeTreeData::DataPart>(
*this, reservation->getDisk(), dst_part_name, dst_part_info);
MergeTreeData::MutableDataPartPtr dst_data_part = createPart(
*this, reservation->getDisk(), dst_part_name, dst_part_info, tmp_dst_part_name);
dst_data_part->relative_path = tmp_dst_part_name;
dst_data_part->is_temp = true;
dst_data_part->loadColumnsChecksumsIndexes(require_part_metadata, true);
@ -3395,7 +3393,7 @@ MergeTreeData::CurrentlyMovingPartsTagger::CurrentlyMovingPartsTagger(MergeTreeM
: parts_to_move(std::move(moving_parts_)), data(data_)
{
for (const auto & moving_part : parts_to_move)
if (!data.currently_moving_parts.emplace(moving_part.part).second)
if (!data.currently_moving_parts.emplace(moving_part.part->name).second)
throw Exception("Cannot move part '" + moving_part.part->name + "'. It's already moving.", ErrorCodes::LOGICAL_ERROR);
}
@ -3405,9 +3403,9 @@ MergeTreeData::CurrentlyMovingPartsTagger::~CurrentlyMovingPartsTagger()
for (const auto & moving_part : parts_to_move)
{
/// Something went completely wrong
if (!data.currently_moving_parts.count(moving_part.part))
if (!data.currently_moving_parts.count(moving_part.part->name))
std::terminate();
data.currently_moving_parts.erase(moving_part.part);
data.currently_moving_parts.erase(moving_part.part->name);
}
}
@ -3446,7 +3444,7 @@ MergeTreeData::CurrentlyMovingPartsTagger MergeTreeData::selectPartsForMove()
*reason = "part already assigned to background operation.";
return false;
}
if (currently_moving_parts.count(part))
if (currently_moving_parts.count(part->name))
{
*reason = "part is already moving.";
return false;
@ -3480,7 +3478,7 @@ MergeTreeData::CurrentlyMovingPartsTagger MergeTreeData::checkPartsForMove(const
"Move is not possible: " + path_to_clone + part->name + " already exists",
ErrorCodes::DIRECTORY_ALREADY_EXISTS);
if (currently_moving_parts.count(part) || partIsAssignedToBackgroundOperation(part))
if (currently_moving_parts.count(part->name) || partIsAssignedToBackgroundOperation(part))
throw Exception(
"Cannot move part '" + part->name + "' because it's participating in background process",
ErrorCodes::PART_IS_TEMPORARILY_LOCKED);

View File

@ -15,7 +15,7 @@
#include <DataTypes/DataTypeString.h>
#include <DataTypes/DataTypesNumber.h>
#include <DataStreams/GraphiteRollupSortedBlockInputStream.h>
#include <Storages/MergeTree/MergeTreeDataPart.h>
#include <Storages/MergeTree/IMergeTreeDataPart.h>
#include <Storages/IndicesDescription.h>
#include <Storages/MergeTree/MergeTreePartsMover.h>
#include <Interpreters/PartLog.h>
@ -78,7 +78,7 @@ namespace ErrorCodes
/// The same files as for month-partitioned tables, plus
/// count.txt - contains total number of rows in this part.
/// partition.dat - contains the value of the partitioning expression.
/// minmax_[Column].idx - MinMax indexes (see MergeTreeDataPart::MinMaxIndex class) for the columns required by the partitioning expression.
/// minmax_[Column].idx - MinMax indexes (seeIMergeTreeDataPart::MinMaxIndex class) for the columns required by the partitioning expression.
///
/// Several modes are implemented. Modes determine additional actions during merge:
/// - Ordinary - don't do anything special
@ -101,14 +101,14 @@ class MergeTreeData : public IStorage
public:
/// Function to call if the part is suspected to contain corrupt data.
using BrokenPartCallback = std::function<void (const String &)>;
using DataPart = MergeTreeDataPart;
using DataPart = IMergeTreeDataPart;
using MutableDataPartPtr = std::shared_ptr<DataPart>;
using MutableDataPartsVector = std::vector<MutableDataPartPtr>;
/// After the DataPart is added to the working set, it cannot be changed.
using DataPartPtr = std::shared_ptr<const DataPart>;
using DataPartState = MergeTreeDataPart::State;
using DataPartState = IMergeTreeDataPart::State;
using DataPartStates = std::initializer_list<DataPartState>;
using DataPartStateVector = std::vector<DataPartState>;
@ -737,14 +737,15 @@ public:
/// if we decide to move some part to another disk, than we
/// assuredly will choose this disk for containing part, which will appear
/// as result of merge or mutation.
DataParts currently_moving_parts;
NameSet currently_moving_parts;
/// Mutex for currently_moving_parts
mutable std::mutex moving_parts_mutex;
protected:
friend struct MergeTreeDataPart;
friend class IMergeTreeDataPart;
friend class MergeTreeDataPartWide;
friend class MergeTreeDataMergerMutator;
friend class ReplicatedMergeTreeAlterThread;
friend struct ReplicatedMergeTreeTableMetadata;

View File

@ -9,6 +9,7 @@
#include <Storages/MergeTree/TTLMergeSelector.h>
#include <Storages/MergeTree/MergeList.h>
#include <Storages/MergeTree/StorageFromMergeTreeDataPart.h>
#include <Storages/MergeTree/MergeTreeDataPartFactory.h>
#include <DataStreams/TTLBlockInputStream.h>
#include <DataStreams/DistinctSortedBlockInputStream.h>
#include <DataStreams/ExpressionBlockInputStream.h>
@ -552,8 +553,8 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mergePartsToTempor
throw Exception("Directory " + new_part_tmp_path + " already exists", ErrorCodes::DIRECTORY_ALREADY_EXISTS);
MergeTreeData::DataPart::ColumnToSize merged_column_to_size;
for (const MergeTreeData::DataPartPtr & part : parts)
part->accumulateColumnSizes(merged_column_to_size);
// for (const MergeTreeData::DataPartPtr & part : parts)
// part->accumulateColumnSizes(merged_column_to_size);
Names all_column_names = data.getColumns().getNamesOfPhysical();
NamesAndTypesList all_columns = data.getColumns().getAllPhysical();
@ -566,10 +567,10 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mergePartsToTempor
all_columns, data.sorting_key_expr, data.skip_indices,
data.merging_params, gathering_columns, gathering_column_names, merging_columns, merging_column_names);
MergeTreeData::MutableDataPartPtr new_data_part = std::make_shared<MergeTreeData::DataPart>(
data, space_reservation->getDisk(), future_part.name, future_part.part_info);
MergeTreeData::MutableDataPartPtr new_data_part = createPart(
data, space_reservation->getDisk(), future_part.name, future_part.part_info, TMP_PREFIX + future_part.name);
new_data_part->partition.assign(future_part.getPartition());
new_data_part->relative_path = TMP_PREFIX + future_part.name;
new_data_part->is_temp = true;
size_t sum_input_rows_upper_bound = merge_entry->total_rows_count;
@ -922,7 +923,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mutatePartToTempor
CurrentMetrics::Increment num_mutations{CurrentMetrics::PartMutation};
const auto & source_part = future_part.parts[0];
auto storage_from_source_part = StorageFromMergeTreeDataPart::create(source_part);
auto storage_from_source_part = StorageFroIMergeTreeDataPart::create(source_part);
auto context_for_reading = context;
context_for_reading.getSettingsRef().merge_tree_uniform_read_distribution = 0;
@ -949,9 +950,9 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mutatePartToTempor
else
LOG_TRACE(log, "Mutating part " << source_part->name << " to mutation version " << future_part.part_info.mutation);
MergeTreeData::MutableDataPartPtr new_data_part = std::make_shared<MergeTreeData::DataPart>(
data, space_reservation->getDisk(), future_part.name, future_part.part_info);
new_data_part->relative_path = "tmp_mut_" + future_part.name;
MergeTreeData::MutableDataPartPtr new_data_part = createPart(
data, space_reservation->getDisk(), future_part.name, future_part.part_info, "tmp_mut_" + future_part.name);
new_data_part->is_temp = true;
new_data_part->ttl_infos = source_part->ttl_infos;
new_data_part->index_granularity_info = source_part->index_granularity_info;
@ -988,7 +989,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mutatePartToTempor
in = std::make_shared<MaterializingBlockInputStream>(
std::make_shared<ExpressionBlockInputStream>(in, data.primary_key_and_skip_indices_expr));
MergeTreeDataPart::MinMaxIndex minmax_idx;
IMergeTreeDataPart::MinMaxIndex minmax_idx;
MergedBlockOutputStream out(data, new_part_tmp_path, all_columns, compression_codec);

View File

@ -0,0 +1,114 @@
#pragma once
#include <Core/Row.h>
#include <Core/Block.h>
#include <Core/Types.h>
#include <Core/NamesAndTypes.h>
#include <Storages/MergeTree/MergeTreeIndexGranularity.h>
#include <Storages/MergeTree/MergeTreeIndexGranularityInfo.h>
#include <Storages/MergeTree/MergeTreeIndices.h>
#include <Storages/MergeTree/MergeTreePartInfo.h>
#include <Storages/MergeTree/MergeTreePartition.h>
#include <Storages/MergeTree/MergeTreeDataPartChecksum.h>
#include <Storages/MergeTree/MergeTreeDataPartTTLInfo.h>
#include <Storages/MergeTree/KeyCondition.h>
#include <Storages/MergeTree/IMergeTreeDataPart.h>
#include <Columns/IColumn.h>
#include <Poco/Path.h>
#include <shared_mutex>
namespace DB
{
struct ColumnSize;
class MergeTreeData;
/// Description of the data part.
class MergeTreeDataPartCompact : public IMergeTreeDataPart
{
public:
using Checksums = MergeTreeDataPartChecksums;
using Checksum = MergeTreeDataPartChecksums::Checksum;
MergeTreeDataPartCompact(
const MergeTreeData & storage_,
const String & name_,
const MergeTreePartInfo & info_,
const DiskSpace::DiskPtr & disk,
const std::optional<String> & relative_path = {});
MergeTreeDataPartCompact(
MergeTreeData & storage_,
const String & name_,
const DiskSpace::DiskPtr & disk,
const std::optional<String> & relative_path = {});
MergeTreeReaderPtr getReader(
const NamesAndTypesList & columns,
const MarkRanges & mark_ranges,
UncompressedCache * uncompressed_cache,
MarkCache * mark_cache,
const ReaderSettings & reader_settings_,
const ValueSizeMap & avg_value_size_hints = ValueSizeMap{},
const ReadBufferFromFileBase::ProfileCallback & profile_callback = ReadBufferFromFileBase::ProfileCallback{}) const override;
bool isStoredOnDisk() const override { return true; }
void remove() const override;
/// NOTE: Returns zeros if column files are not found in checksums.
/// NOTE: You must ensure that no ALTERs are in progress when calculating ColumnSizes.
/// (either by locking columns_lock, or by locking table structure).
ColumnSize getColumnSize(const String & name, const IDataType & type) const override;
/// Initialize columns (from columns.txt if exists, or create from column files if not).
/// Load checksums from checksums.txt if exists. Load index if required.
void loadColumnsChecksumsIndexes(bool require_columns_checksums, bool check_consistency) override;
/// Returns the name of a column with minimum compressed size (as returned by getColumnSize()).
/// If no checksums are present returns the name of the first physically existing column.
String getColumnNameWithMinumumCompressedSize() const override;
virtual Type getType() const override { return Type::WIDE; }
~MergeTreeDataPartCompact() override;
/// Calculate the total size of the entire directory with all the files
static UInt64 calculateTotalSizeOnDisk(const String & from);
private:
/// Reads columns names and types from columns.txt
void loadColumns(bool require);
/// If checksums.txt exists, reads files' checksums (and sizes) from it
void loadChecksums(bool require);
/// Loads marks index granularity into memory
void loadIndexGranularity();
/// Loads index file.
void loadIndex();
/// Load rows count for this part from disk (for the newer storage format version).
/// For the older format version calculates rows count from the size of a column with a fixed size.
void loadRowsCount();
/// Loads ttl infos in json format from file ttl.txt. If file doesn`t exists assigns ttl infos with all zeros
void loadTTLInfos();
void loadPartitionAndMinMaxIndex();
ColumnSize getColumnSizeImpl(const String & name, const IDataType & type, std::unordered_set<String> * processed_substreams) const override;
void checkConsistency(bool require_part_metadata);
};
// using MergeTreeDataPartState =IMergeTreeDataPart::State;
}

View File

@ -0,0 +1,16 @@
#include "MergeTreeDataPartFactory.h"
namespace DB
{
std::shared_ptr<IMergeTreeDataPart> createPart(const MergeTreeData & storage, const DiskSpace::DiskPtr & disk, const String & name,
const MergeTreePartInfo & info, const String & relative_path)
{
return std::make_shared<MergeTreeDataPartWide>(storage, name, info, disk, relative_path);
}
std::shared_ptr<IMergeTreeDataPart> createPart(MergeTreeData & storage, const DiskSpace::DiskPtr & disk,
const String & name, const String & relative_path)
{
return std::make_shared<MergeTreeDataPartWide>(storage, name, disk, relative_path);
}
}

View File

@ -0,0 +1,9 @@
#include <Storages/MergeTree/IMergeTreeDataPart.h>
#include <Storages/MergeTree/MergeTreeDataPartWide.h>
namespace DB
{
std::shared_ptr<IMergeTreeDataPart> createPart(const MergeTreeData & storage_, const DiskSpace::DiskPtr & disk_, const String & name_, const MergeTreePartInfo & info_, const String & relative_path);
std::shared_ptr<IMergeTreeDataPart> createPart(MergeTreeData & storage_, const DiskSpace::DiskPtr & disk_, const String & name_, const String & relative_path);
}

View File

@ -0,0 +1,33 @@
#pragma once
#include <DataStreams/IBlockInputStream.h>
#include <Core/Row.h>
#include <Core/Block.h>
#include <Core/Types.h>
#include <Core/NamesAndTypes.h>
#include <Storages/MergeTree/MergeTreeIndexGranularity.h>
#include <Storages/MergeTree/MergeTreeIndexGranularityInfo.h>
#include <Storages/MergeTree/MergeTreeIndices.h>
#include <Storages/MergeTree/MergeTreePartInfo.h>
#include <Storages/MergeTree/MergeTreePartition.h>
#include <Storages/MergeTree/IMergeTreeDataPartChecksum.h>
#include <Storages/MergeTree/IMergeTreeDataPartTTLInfo.h>
#include <Storages/MergeTree/KeyCondition.h>
#include <Storages/MergeTree/IMergeTreeDataPart.h>
#include <Columns/IColumn.h>
#include <Poco/Path.h>
#include <shared_mutex>
namespace DB
{
class MergeTreeDataPartOnDisk : IMergeTreeDataPart
{
};
}

View File

@ -1,4 +1,4 @@
#include "MergeTreeDataPart.h"
#include "MergeTreeDataPartWide.h"
#include <optional>
#include <IO/ReadHelpers.h>
@ -22,9 +22,17 @@
#include <common/logger_useful.h>
#include <common/JSON.h>
#include <Storages/MergeTree/MergeTreeReaderWide.h>
#include <Storages/MergeTree/IMergeTreeReader.h>
namespace DB
{
// namespace
// {
// }
namespace ErrorCodes
{
extern const int FILE_DOESNT_EXIST;
@ -43,123 +51,42 @@ static ReadBufferFromFile openForReading(const String & path)
return ReadBufferFromFile(path, std::min(static_cast<Poco::File::FileSize>(DBMS_DEFAULT_BUFFER_SIZE), Poco::File(path).getSize()));
}
void MergeTreeDataPart::MinMaxIndex::load(const MergeTreeData & data, const String & part_path)
{
size_t minmax_idx_size = data.minmax_idx_column_types.size();
parallelogram.reserve(minmax_idx_size);
for (size_t i = 0; i < minmax_idx_size; ++i)
{
String file_name = part_path + "minmax_" + escapeForFileName(data.minmax_idx_columns[i]) + ".idx";
ReadBufferFromFile file = openForReading(file_name);
const DataTypePtr & type = data.minmax_idx_column_types[i];
Field min_val;
type->deserializeBinary(min_val, file);
Field max_val;
type->deserializeBinary(max_val, file);
parallelogram.emplace_back(min_val, true, max_val, true);
}
initialized = true;
}
void MergeTreeDataPart::MinMaxIndex::store(const MergeTreeData & data, const String & part_path, Checksums & out_checksums) const
{
store(data.minmax_idx_columns, data.minmax_idx_column_types, part_path, out_checksums);
}
void MergeTreeDataPart::MinMaxIndex::store(const Names & column_names, const DataTypes & data_types, const String & part_path, Checksums & out_checksums) const
{
if (!initialized)
throw Exception("Attempt to store uninitialized MinMax index for part " + part_path + ". This is a bug.",
ErrorCodes::LOGICAL_ERROR);
for (size_t i = 0; i < column_names.size(); ++i)
{
String file_name = "minmax_" + escapeForFileName(column_names[i]) + ".idx";
const DataTypePtr & type = data_types.at(i);
WriteBufferFromFile out(part_path + file_name);
HashingWriteBuffer out_hashing(out);
type->serializeBinary(parallelogram[i].left, out_hashing);
type->serializeBinary(parallelogram[i].right, out_hashing);
out_hashing.next();
out_checksums.files[file_name].file_size = out_hashing.count();
out_checksums.files[file_name].file_hash = out_hashing.getHash();
}
}
void MergeTreeDataPart::MinMaxIndex::update(const Block & block, const Names & column_names)
{
if (!initialized)
parallelogram.reserve(column_names.size());
for (size_t i = 0; i < column_names.size(); ++i)
{
Field min_value;
Field max_value;
const ColumnWithTypeAndName & column = block.getByName(column_names[i]);
column.column->getExtremes(min_value, max_value);
if (!initialized)
parallelogram.emplace_back(min_value, true, max_value, true);
else
{
parallelogram[i].left = std::min(parallelogram[i].left, min_value);
parallelogram[i].right = std::max(parallelogram[i].right, max_value);
}
}
initialized = true;
}
void MergeTreeDataPart::MinMaxIndex::merge(const MinMaxIndex & other)
{
if (!other.initialized)
return;
if (!initialized)
{
parallelogram = other.parallelogram;
initialized = true;
}
else
{
for (size_t i = 0; i < parallelogram.size(); ++i)
{
parallelogram[i].left = std::min(parallelogram[i].left, other.parallelogram[i].left);
parallelogram[i].right = std::max(parallelogram[i].right, other.parallelogram[i].right);
}
}
}
MergeTreeDataPart::MergeTreeDataPart(MergeTreeData & storage_, const DiskSpace::DiskPtr & disk_, const String & name_)
: storage(storage_)
, disk(disk_)
, name(name_)
, info(MergeTreePartInfo::fromPartName(name_, storage.format_version))
, index_granularity_info(storage)
MergeTreeDataPartWide::MergeTreeDataPartWide(
MergeTreeData & storage_,
const String & name_,
const DiskSpace::DiskPtr & disk_,
const std::optional<String> & relative_path_)
: IMergeTreeDataPart(storage_, name_, disk_, relative_path_)
{
}
MergeTreeDataPart::MergeTreeDataPart(
const MergeTreeData & storage_,
const DiskSpace::DiskPtr & disk_,
const String & name_,
const MergeTreePartInfo & info_)
: storage(storage_)
, disk(disk_)
, name(name_)
, info(info_)
, index_granularity_info(storage)
MergeTreeDataPartWide::MergeTreeDataPartWide(
const MergeTreeData & storage_,
const String & name_,
const MergeTreePartInfo & info_,
const DiskSpace::DiskPtr & disk_,
const std::optional<String> & relative_path_)
: IMergeTreeDataPart(storage_, name_, info_, disk_, relative_path_)
{
}
IMergeTreeDataPart::MergeTreeReaderPtr MergeTreeDataPartWide::getReader(
const NamesAndTypesList & columns_to_read,
const MarkRanges & mark_ranges,
UncompressedCache * uncompressed_cache,
MarkCache * mark_cache,
const ReaderSettings & reader_settings,
const ValueSizeMap & avg_value_size_hints,
const ReadBufferFromFileBase::ProfileCallback & profile_callback) const
{
return std::make_unique<MergeTreeReaderWide>(shared_from_this(), columns_to_read, uncompressed_cache,
mark_cache, mark_ranges, reader_settings, avg_value_size_hints, profile_callback);
}
/// Takes into account the fact that several columns can e.g. share their .size substreams.
/// When calculating totals these should be counted only once.
ColumnSize MergeTreeDataPart::getColumnSizeImpl(
ColumnSize MergeTreeDataPartWide::getColumnSizeImpl(
const String & column_name, const IDataType & type, std::unordered_set<String> * processed_substreams) const
{
ColumnSize size;
@ -188,36 +115,15 @@ ColumnSize MergeTreeDataPart::getColumnSizeImpl(
return size;
}
ColumnSize MergeTreeDataPart::getColumnSize(const String & column_name, const IDataType & type) const
ColumnSize MergeTreeDataPartWide::getColumnSize(const String & column_name, const IDataType & type) const
{
return getColumnSizeImpl(column_name, type, nullptr);
}
ColumnSize MergeTreeDataPart::getTotalColumnsSize() const
{
ColumnSize totals;
std::unordered_set<String> processed_substreams;
for (const NameAndTypePair & column : columns)
{
ColumnSize size = getColumnSizeImpl(column.name, *column.type, &processed_substreams);
totals.add(size);
}
return totals;
}
size_t MergeTreeDataPart::getFileSizeOrZero(const String & file_name) const
{
auto checksum = checksums.files.find(file_name);
if (checksum == checksums.files.end())
return 0;
return checksum->second.file_size;
}
/** Returns the name of a column with minimum compressed size (as returned by getColumnSize()).
* If no checksums are present returns the name of the first physically existing column.
*/
String MergeTreeDataPart::getColumnNameWithMinumumCompressedSize() const
String MergeTreeDataPartWide::getColumnNameWithMinumumCompressedSize() const
{
const auto & storage_columns = storage.getColumns().getAllPhysical();
const std::string * minimum_size_column = nullptr;
@ -242,77 +148,7 @@ String MergeTreeDataPart::getColumnNameWithMinumumCompressedSize() const
return *minimum_size_column;
}
String MergeTreeDataPart::getFullPath() const
{
if (relative_path.empty())
throw Exception("Part relative_path cannot be empty. It's bug.", ErrorCodes::LOGICAL_ERROR);
return storage.getFullPathOnDisk(disk) + relative_path + "/";
}
String MergeTreeDataPart::getNameWithPrefix() const
{
String res = Poco::Path(relative_path).getFileName();
if (res.empty())
throw Exception("relative_path " + relative_path + " of part " + name + " is invalid or not set", ErrorCodes::LOGICAL_ERROR);
return res;
}
String MergeTreeDataPart::getNewName(const MergeTreePartInfo & new_part_info) const
{
if (storage.format_version < MERGE_TREE_DATA_MIN_FORMAT_VERSION_WITH_CUSTOM_PARTITIONING)
{
/// NOTE: getting min and max dates from the part name (instead of part data) because we want
/// the merged part name be determined only by source part names.
/// It is simpler this way when the real min and max dates for the block range can change
/// (e.g. after an ALTER DELETE command).
DayNum min_date;
DayNum max_date;
MergeTreePartInfo::parseMinMaxDatesFromPartName(name, min_date, max_date);
return new_part_info.getPartNameV0(min_date, max_date);
}
else
return new_part_info.getPartName();
}
DayNum MergeTreeDataPart::getMinDate() const
{
if (storage.minmax_idx_date_column_pos != -1 && minmax_idx.initialized)
return DayNum(minmax_idx.parallelogram[storage.minmax_idx_date_column_pos].left.get<UInt64>());
else
return DayNum();
}
DayNum MergeTreeDataPart::getMaxDate() const
{
if (storage.minmax_idx_date_column_pos != -1 && minmax_idx.initialized)
return DayNum(minmax_idx.parallelogram[storage.minmax_idx_date_column_pos].right.get<UInt64>());
else
return DayNum();
}
time_t MergeTreeDataPart::getMinTime() const
{
if (storage.minmax_idx_time_column_pos != -1 && minmax_idx.initialized)
return minmax_idx.parallelogram[storage.minmax_idx_time_column_pos].left.get<UInt64>();
else
return 0;
}
time_t MergeTreeDataPart::getMaxTime() const
{
if (storage.minmax_idx_time_column_pos != -1 && minmax_idx.initialized)
return minmax_idx.parallelogram[storage.minmax_idx_time_column_pos].right.get<UInt64>();
else
return 0;
}
MergeTreeDataPart::~MergeTreeDataPart()
MergeTreeDataPartWide::~MergeTreeDataPartWide()
{
if (state == State::DeleteOnDestroy || is_temp)
{
@ -326,7 +162,12 @@ MergeTreeDataPart::~MergeTreeDataPart()
if (is_temp)
{
if (!startsWith(getNameWithPrefix(), "tmp"))
String file_name = Poco::Path(relative_path).getFileName();
if (file_name.empty())
throw Exception("relative_path " + relative_path + " of part " + name + " is invalid or not set", ErrorCodes::LOGICAL_ERROR);
if (!startsWith(file_name, "tmp"))
{
LOG_ERROR(storage.log, "~DataPart() should remove part " << path
<< " but its name doesn't start with tmp. Too suspicious, keeping the part.");
@ -343,7 +184,7 @@ MergeTreeDataPart::~MergeTreeDataPart()
}
}
UInt64 MergeTreeDataPart::calculateTotalSizeOnDisk(const String & from)
UInt64 MergeTreeDataPartWide::calculateTotalSizeOnDisk(const String & from)
{
Poco::File cur(from);
if (cur.isFile())
@ -356,7 +197,7 @@ UInt64 MergeTreeDataPart::calculateTotalSizeOnDisk(const String & from)
return res;
}
void MergeTreeDataPart::remove() const
void MergeTreeDataPartWide::remove() const
{
if (relative_path.empty())
throw Exception("Part relative_path cannot be empty. This is bug.", ErrorCodes::LOGICAL_ERROR);
@ -453,104 +294,7 @@ void MergeTreeDataPart::remove() const
}
}
void MergeTreeDataPart::renameTo(const String & new_relative_path, bool remove_new_dir_if_exists) const
{
String from = getFullPath();
String to = storage.getFullPathOnDisk(disk) + new_relative_path + "/";
Poco::File from_file(from);
if (!from_file.exists())
throw Exception("Part directory " + from + " doesn't exist. Most likely it is logical error.", ErrorCodes::FILE_DOESNT_EXIST);
Poco::File to_file(to);
if (to_file.exists())
{
if (remove_new_dir_if_exists)
{
Names files;
Poco::File(from).list(files);
LOG_WARNING(storage.log, "Part directory " << to << " already exists"
<< " and contains " << files.size() << " files. Removing it.");
to_file.remove(true);
}
else
{
throw Exception("Part directory " + to + " already exists", ErrorCodes::DIRECTORY_ALREADY_EXISTS);
}
}
from_file.setLastModified(Poco::Timestamp::fromEpochTime(time(nullptr)));
from_file.renameTo(to);
relative_path = new_relative_path;
}
String MergeTreeDataPart::getRelativePathForDetachedPart(const String & prefix) const
{
/// Do not allow underscores in the prefix because they are used as separators.
assert(prefix.find_first_of('_') == String::npos);
String res;
/** If you need to detach a part, and directory into which we want to rename it already exists,
* we will rename to the directory with the name to which the suffix is added in the form of "_tryN".
* This is done only in the case of `to_detached`, because it is assumed that in this case the exact name does not matter.
* No more than 10 attempts are made so that there are not too many junk directories left.
*/
for (int try_no = 0; try_no < 10; try_no++)
{
res = "detached/" + (prefix.empty() ? "" : prefix + "_")
+ name + (try_no ? "_try" + DB::toString(try_no) : "");
if (!Poco::File(storage.getFullPathOnDisk(disk) + res).exists())
return res;
LOG_WARNING(storage.log, "Directory " << res << " (to detach to) already exists."
" Will detach to directory with '_tryN' suffix.");
}
return res;
}
void MergeTreeDataPart::renameToDetached(const String & prefix) const
{
renameTo(getRelativePathForDetachedPart(prefix));
}
UInt64 MergeTreeDataPart::getMarksCount() const
{
return index_granularity.getMarksCount();
}
void MergeTreeDataPart::makeCloneInDetached(const String & prefix) const
{
Poco::Path src(getFullPath());
Poco::Path dst(storage.getFullPathOnDisk(disk) + getRelativePathForDetachedPart(prefix));
/// Backup is not recursive (max_level is 0), so do not copy inner directories
localBackup(src, dst, 0);
}
void MergeTreeDataPart::makeCloneOnDiskDetached(const DiskSpace::ReservationPtr & reservation) const
{
auto & reserved_disk = reservation->getDisk();
if (reserved_disk->getName() == disk->getName())
throw Exception("Can not clone data part " + name + " to same disk " + disk->getName(), ErrorCodes::LOGICAL_ERROR);
String path_to_clone = storage.getFullPathOnDisk(reserved_disk) + "detached/";
if (Poco::File(path_to_clone + relative_path).exists())
throw Exception("Path " + path_to_clone + relative_path + " already exists. Can not clone ", ErrorCodes::DIRECTORY_ALREADY_EXISTS);
Poco::File(path_to_clone).createDirectory();
Poco::File cloning_directory(getFullPath());
cloning_directory.copyTo(path_to_clone);
}
void MergeTreeDataPart::loadColumnsChecksumsIndexes(bool require_columns_checksums, bool check_consistency)
void MergeTreeDataPartWide::loadColumnsChecksumsIndexes(bool require_columns_checksums, bool /* check_consistency */)
{
/// Memory should not be limited during ATTACH TABLE query.
/// This is already true at the server startup but must be also ensured for manual table ATTACH.
@ -564,13 +308,12 @@ void MergeTreeDataPart::loadColumnsChecksumsIndexes(bool require_columns_checksu
loadRowsCount(); /// Must be called after loadIndex() as it uses the value of `index_granularity`.
loadPartitionAndMinMaxIndex();
loadTTLInfos();
if (check_consistency)
checkConsistency(require_columns_checksums);
// if (check_consistency)
// checkConsistency(require_columns_checksums);
}
void MergeTreeDataPart::loadIndexGranularity()
void MergeTreeDataPartWide::loadIndexGranularity()
{
String full_path = getFullPath();
index_granularity_info.changeGranularityIfRequired(full_path);
@ -606,7 +349,7 @@ void MergeTreeDataPart::loadIndexGranularity()
index_granularity.setInitialized();
}
void MergeTreeDataPart::loadIndex()
void MergeTreeDataPartWide::loadIndex()
{
/// It can be empty in case of mutations
if (!index_granularity.isInitialized())
@ -648,7 +391,7 @@ void MergeTreeDataPart::loadIndex()
}
}
void MergeTreeDataPart::loadPartitionAndMinMaxIndex()
void MergeTreeDataPartWide::loadPartitionAndMinMaxIndex()
{
if (storage.format_version < MERGE_TREE_DATA_MIN_FORMAT_VERSION_WITH_CUSTOM_PARTITIONING)
{
@ -676,7 +419,7 @@ void MergeTreeDataPart::loadPartitionAndMinMaxIndex()
ErrorCodes::CORRUPTED_DATA);
}
void MergeTreeDataPart::loadChecksums(bool require)
void MergeTreeDataPartWide::loadChecksums(bool require)
{
String path = getFullPath() + "checksums.txt";
Poco::File checksums_file(path);
@ -700,7 +443,7 @@ void MergeTreeDataPart::loadChecksums(bool require)
}
}
void MergeTreeDataPart::loadRowsCount()
void MergeTreeDataPartWide::loadRowsCount()
{
if (index_granularity.empty())
{
@ -754,7 +497,7 @@ void MergeTreeDataPart::loadRowsCount()
}
}
void MergeTreeDataPart::loadTTLInfos()
void MergeTreeDataPartWide::loadTTLInfos()
{
String path = getFullPath() + "ttl.txt";
if (Poco::File(path).exists())
@ -781,23 +524,23 @@ void MergeTreeDataPart::loadTTLInfos()
}
}
void MergeTreeDataPart::accumulateColumnSizes(ColumnToSize & column_to_size) const
{
std::shared_lock<std::shared_mutex> part_lock(columns_lock);
// void MergeTreeDataPartWide::accumulateColumnSizes(ColumnToSize & column_to_size) const
// {
// std::shared_lock<std::shared_mutex> part_lock(columns_lock);
for (const NameAndTypePair & name_type : storage.getColumns().getAllPhysical())
{
IDataType::SubstreamPath path;
name_type.type->enumerateStreams([&](const IDataType::SubstreamPath & substream_path)
{
Poco::File bin_file(getFullPath() + IDataType::getFileNameForStream(name_type.name, substream_path) + ".bin");
if (bin_file.exists())
column_to_size[name_type.name] += bin_file.getSize();
}, path);
}
}
// for (const NameAndTypePair & name_type : storage.getColumns().getAllPhysical())
// {
// IDataType::SubstreamPath path;
// name_type.type->enumerateStreams([&](const IDataType::SubstreamPath & substream_path)
// {
// Poco::File bin_file(getFullPath() + IDataType::getFileNameForStream(name_type.name, substream_path) + ".bin");
// if (bin_file.exists())
// column_to_size[name_type.name] += bin_file.getSize();
// }, path);
// }
// }
void MergeTreeDataPart::loadColumns(bool require)
void MergeTreeDataPartWide::loadColumns(bool require)
{
String path = getFullPath() + "columns.txt";
Poco::File poco_file_path{path};
@ -829,7 +572,7 @@ void MergeTreeDataPart::loadColumns(bool require)
columns.readText(file);
}
void MergeTreeDataPart::checkConsistency(bool require_part_metadata)
void MergeTreeDataPartWide::checkConsistency(bool require_part_metadata)
{
String path = getFullPath();
@ -932,77 +675,22 @@ void MergeTreeDataPart::checkConsistency(bool require_part_metadata)
}
}
bool MergeTreeDataPart::hasColumnFiles(const String & column_name, const IDataType & type) const
{
bool res = true;
// bool MergeTreeDataPartWide::hasColumnFiles(const String & column_name, const IDataType & type) const
// {
// bool res = true;
type.enumerateStreams([&](const IDataType::SubstreamPath & substream_path)
{
String file_name = IDataType::getFileNameForStream(column_name, substream_path);
// type.enumerateStreams([&](const IDataType::SubstreamPath & substream_path)
// {
// String file_name = IDataType::getFileNameForStream(column_name, substream_path);
auto bin_checksum = checksums.files.find(file_name + ".bin");
auto mrk_checksum = checksums.files.find(file_name + index_granularity_info.marks_file_extension);
// auto bin_checksum = checksums.files.find(file_name + ".bin");
// auto mrk_checksum = checksums.files.find(file_name + index_granularity_info.marks_file_extension);
if (bin_checksum == checksums.files.end() || mrk_checksum == checksums.files.end())
res = false;
}, {});
// if (bin_checksum == checksums.files.end() || mrk_checksum == checksums.files.end())
// res = false;
// }, {});
return res;
}
UInt64 MergeTreeDataPart::getIndexSizeInBytes() const
{
UInt64 res = 0;
for (const ColumnPtr & column : index)
res += column->byteSize();
return res;
}
UInt64 MergeTreeDataPart::getIndexSizeInAllocatedBytes() const
{
UInt64 res = 0;
for (const ColumnPtr & column : index)
res += column->allocatedBytes();
return res;
}
String MergeTreeDataPart::stateToString(MergeTreeDataPart::State state)
{
switch (state)
{
case State::Temporary:
return "Temporary";
case State::PreCommitted:
return "PreCommitted";
case State::Committed:
return "Committed";
case State::Outdated:
return "Outdated";
case State::Deleting:
return "Deleting";
case State::DeleteOnDestroy:
return "DeleteOnDestroy";
}
__builtin_unreachable();
}
String MergeTreeDataPart::stateString() const
{
return stateToString(state);
}
void MergeTreeDataPart::assertState(const std::initializer_list<MergeTreeDataPart::State> & affordable_states) const
{
if (!checkState(affordable_states))
{
String states_str;
for (auto affordable_state : affordable_states)
states_str += stateToString(affordable_state) + " ";
throw Exception("Unexpected state of part " + getNameWithState() + ". Expected: " + states_str, ErrorCodes::NOT_FOUND_EXPECTED_DATA_PART);
}
}
// return res;
// }
}

View File

@ -0,0 +1,116 @@
#pragma once
#include <Core/Row.h>
#include <Core/Block.h>
#include <Core/Types.h>
#include <Core/NamesAndTypes.h>
#include <Storages/MergeTree/MergeTreeIndexGranularity.h>
#include <Storages/MergeTree/MergeTreeIndexGranularityInfo.h>
#include <Storages/MergeTree/MergeTreeIndices.h>
#include <Storages/MergeTree/MergeTreePartInfo.h>
#include <Storages/MergeTree/MergeTreePartition.h>
#include <Storages/MergeTree/MergeTreeDataPartChecksum.h>
#include <Storages/MergeTree/MergeTreeDataPartTTLInfo.h>
#include <Storages/MergeTree/KeyCondition.h>
#include <Storages/MergeTree/IMergeTreeDataPart.h>
#include <Columns/IColumn.h>
#include <Poco/Path.h>
#include <shared_mutex>
namespace DB
{
struct ColumnSize;
class MergeTreeData;
/// Description of the data part.
class MergeTreeDataPartWide : public IMergeTreeDataPart
{
public:
using Checksums = MergeTreeDataPartChecksums;
using Checksum = MergeTreeDataPartChecksums::Checksum;
MergeTreeDataPartWide(
const MergeTreeData & storage_,
const String & name_,
const MergeTreePartInfo & info_,
const DiskSpace::DiskPtr & disk,
const std::optional<String> & relative_path = {});
MergeTreeDataPartWide(
MergeTreeData & storage_,
const String & name_,
const DiskSpace::DiskPtr & disk,
const std::optional<String> & relative_path = {});
MergeTreeReaderPtr getReader(
const NamesAndTypesList & columns,
const MarkRanges & mark_ranges,
UncompressedCache * uncompressed_cache,
MarkCache * mark_cache,
const ReaderSettings & reader_settings_,
const ValueSizeMap & avg_value_size_hints = ValueSizeMap{},
const ReadBufferFromFileBase::ProfileCallback & profile_callback = ReadBufferFromFileBase::ProfileCallback{}) const override;
bool isStoredOnDisk() const override { return true; }
void remove() const override;
bool supportsVerticalMerge() const override { return true; }
/// NOTE: Returns zeros if column files are not found in checksums.
/// NOTE: You must ensure that no ALTERs are in progress when calculating ColumnSizes.
/// (either by locking columns_lock, or by locking table structure).
ColumnSize getColumnSize(const String & name, const IDataType & type) const override;
/// Initialize columns (from columns.txt if exists, or create from column files if not).
/// Load checksums from checksums.txt if exists. Load index if required.
void loadColumnsChecksumsIndexes(bool require_columns_checksums, bool check_consistency) override;
/// Returns the name of a column with minimum compressed size (as returned by getColumnSize()).
/// If no checksums are present returns the name of the first physically existing column.
String getColumnNameWithMinumumCompressedSize() const override;
virtual Type getType() const override { return Type::WIDE; }
~MergeTreeDataPartWide() override;
/// Calculate the total size of the entire directory with all the files
static UInt64 calculateTotalSizeOnDisk(const String & from);
private:
/// Reads columns names and types from columns.txt
void loadColumns(bool require);
/// If checksums.txt exists, reads files' checksums (and sizes) from it
void loadChecksums(bool require);
/// Loads marks index granularity into memory
void loadIndexGranularity();
/// Loads index file.
void loadIndex();
/// Load rows count for this part from disk (for the newer storage format version).
/// For the older format version calculates rows count from the size of a column with a fixed size.
void loadRowsCount();
/// Loads ttl infos in json format from file ttl.txt. If file doesn`t exists assigns ttl infos with all zeros
void loadTTLInfos();
void loadPartitionAndMinMaxIndex();
ColumnSize getColumnSizeImpl(const String & name, const IDataType & type, std::unordered_set<String> * processed_substreams) const override;
void checkConsistency(bool require_part_metadata);
};
// using MergeTreeDataPartState =IMergeTreeDataPart::State;
}

View File

@ -567,6 +567,13 @@ BlockInputStreams MergeTreeDataSelectExecutor::readFromParts(
BlockInputStreams res;
ReaderSettings reader_settings =
{
.min_bytes_to_use_direct_io = settings.min_bytes_to_use_direct_io,
.max_read_buffer_size = settings.max_read_buffer_size,
.save_marks_in_cache = true
};
if (select.final())
{
/// Add columns needed to calculate the sorting expression and the sign.
@ -588,7 +595,8 @@ BlockInputStreams MergeTreeDataSelectExecutor::readFromParts(
settings.use_uncompressed_cache,
query_info,
virt_column_names,
settings);
settings,
reader_settings);
}
else if (settings.optimize_read_in_order && query_info.sorting_info)
{
@ -608,7 +616,8 @@ BlockInputStreams MergeTreeDataSelectExecutor::readFromParts(
query_info,
sorting_key_prefix_expr,
virt_column_names,
settings);
settings,
reader_settings);
}
else
{
@ -620,7 +629,8 @@ BlockInputStreams MergeTreeDataSelectExecutor::readFromParts(
settings.use_uncompressed_cache,
query_info,
virt_column_names,
settings);
settings,
reader_settings);
}
if (use_sampling)
@ -666,7 +676,8 @@ BlockInputStreams MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreams(
bool use_uncompressed_cache,
const SelectQueryInfo & query_info,
const Names & virt_columns,
const Settings & settings) const
const Settings & settings,
const ReaderSettings & reader_settings) const
{
/// Count marks for each part.
std::vector<size_t> sum_marks_in_parts(parts.size());
@ -727,7 +738,7 @@ BlockInputStreams MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreams(
res.emplace_back(std::make_shared<MergeTreeThreadSelectBlockInputStream>(
i, pool, min_marks_for_concurrent_read, max_block_size, settings.preferred_block_size_bytes,
settings.preferred_max_column_in_block_size_bytes, data, use_uncompressed_cache,
query_info.prewhere_info, settings, virt_columns));
query_info.prewhere_info, reader_settings, virt_columns));
if (i == 0)
{
@ -803,8 +814,8 @@ BlockInputStreams MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreams(
BlockInputStreamPtr source_stream = std::make_shared<MergeTreeSelectBlockInputStream>(
data, part.data_part, max_block_size, settings.preferred_block_size_bytes,
settings.preferred_max_column_in_block_size_bytes, column_names, ranges_to_get_from_part,
use_uncompressed_cache, query_info.prewhere_info, true, settings.min_bytes_to_use_direct_io,
settings.max_read_buffer_size, true, virt_columns, part.part_index_in_query);
use_uncompressed_cache, query_info.prewhere_info, true, reader_settings,
virt_columns, part.part_index_in_query);
res.push_back(source_stream);
}
@ -826,7 +837,8 @@ BlockInputStreams MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreamsWithO
const SelectQueryInfo & query_info,
const ExpressionActionsPtr & sorting_key_prefix_expr,
const Names & virt_columns,
const Settings & settings) const
const Settings & settings,
const ReaderSettings & reader_settings) const
{
size_t sum_marks = 0;
SortingInfoPtr sorting_info = query_info.sorting_info;
@ -988,16 +1000,16 @@ BlockInputStreams MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreamsWithO
source_stream = std::make_shared<MergeTreeSelectBlockInputStream>(
data, part.data_part, max_block_size, settings.preferred_block_size_bytes,
settings.preferred_max_column_in_block_size_bytes, column_names, ranges_to_get_from_part,
use_uncompressed_cache, query_info.prewhere_info, true, settings.min_bytes_to_use_direct_io,
settings.max_read_buffer_size, true, virt_columns, part.part_index_in_query);
use_uncompressed_cache, query_info.prewhere_info, true, reader_settings,
virt_columns, part.part_index_in_query);
}
else
{
source_stream = std::make_shared<MergeTreeReverseSelectBlockInputStream>(
data, part.data_part, max_block_size, settings.preferred_block_size_bytes,
settings.preferred_max_column_in_block_size_bytes, column_names, ranges_to_get_from_part,
use_uncompressed_cache, query_info.prewhere_info, true, settings.min_bytes_to_use_direct_io,
settings.max_read_buffer_size, true, virt_columns, part.part_index_in_query);
use_uncompressed_cache, query_info.prewhere_info, true, reader_settings,
virt_columns, part.part_index_in_query);
source_stream = std::make_shared<ReverseBlockInputStream>(source_stream);
}
@ -1033,7 +1045,8 @@ BlockInputStreams MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreamsFinal
bool use_uncompressed_cache,
const SelectQueryInfo & query_info,
const Names & virt_columns,
const Settings & settings) const
const Settings & settings,
const ReaderSettings & reader_settings) const
{
const auto data_settings = data.getSettings();
size_t sum_marks = 0;
@ -1071,7 +1084,7 @@ BlockInputStreams MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreamsFinal
BlockInputStreamPtr source_stream = std::make_shared<MergeTreeSelectBlockInputStream>(
data, part.data_part, max_block_size, settings.preferred_block_size_bytes,
settings.preferred_max_column_in_block_size_bytes, column_names, part.ranges, use_uncompressed_cache,
query_info.prewhere_info, true, settings.min_bytes_to_use_direct_io, settings.max_read_buffer_size, true,
query_info.prewhere_info, true, reader_settings,
virt_columns, part.part_index_in_query);
to_merge.emplace_back(std::make_shared<ExpressionBlockInputStream>(source_stream, data.sorting_key_expr));

View File

@ -54,7 +54,8 @@ private:
bool use_uncompressed_cache,
const SelectQueryInfo & query_info,
const Names & virt_columns,
const Settings & settings) const;
const Settings & settings,
const ReaderSettings & reader_settings) const;
BlockInputStreams spreadMarkRangesAmongStreamsWithOrder(
RangesInDataParts && parts,
@ -65,7 +66,8 @@ private:
const SelectQueryInfo & query_info,
const ExpressionActionsPtr & sorting_key_prefix_expr,
const Names & virt_columns,
const Settings & settings) const;
const Settings & settings,
const ReaderSettings & reader_settings) const;
BlockInputStreams spreadMarkRangesAmongStreamsFinal(
RangesInDataParts && parts,
@ -74,7 +76,8 @@ private:
bool use_uncompressed_cache,
const SelectQueryInfo & query_info,
const Names & virt_columns,
const Settings & settings) const;
const Settings & settings,
const ReaderSettings & reader_settings) const;
/// Get the approximate value (bottom estimate - only by full marks) of the number of rows falling under the index.
size_t getApproximateTotalRowsToRead(

View File

@ -1,5 +1,6 @@
#include <Storages/MergeTree/MergeTreeDataWriter.h>
#include <Storages/MergeTree/MergedBlockOutputStream.h>
#include <Storages/MergeTree/MergeTreeDataPartFactory.h>
#include <Common/HashTable/HashMap.h>
#include <Common/Exception.h>
#include <Interpreters/AggregationCommon.h>
@ -75,7 +76,7 @@ void buildScatterSelector(
}
/// Computes ttls and updates ttl infos
void updateTTL(const MergeTreeData::TTLEntry & ttl_entry, MergeTreeDataPart::TTLInfos & ttl_infos, Block & block, const String & column_name)
void updateTTL(const MergeTreeData::TTLEntry & ttl_entry,IMergeTreeDataPart::TTLInfos & ttl_infos, Block & block, const String & column_name)
{
if (!block.has(ttl_entry.result_column))
ttl_entry.expression->execute(block);
@ -173,7 +174,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataWriter::writeTempPart(BlockWithPa
/// This will generate unique name in scope of current server process.
Int64 temp_index = data.insert_increment.get();
MergeTreeDataPart::MinMaxIndex minmax_idx;
IMergeTreeDataPart::MinMaxIndex minmax_idx;
minmax_idx.update(block, data.minmax_idx_columns);
MergeTreePartition partition(std::move(block_with_partition.partition));
@ -204,11 +205,10 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataWriter::writeTempPart(BlockWithPa
MergeTreeData::MutableDataPartPtr new_data_part =
std::make_shared<MergeTreeData::DataPart>(data, reservation->getDisk(), part_name, new_part_info);
createPart(data, reservation->getDisk(), part_name, new_part_info, TMP_PREFIX + part_name);
new_data_part->partition = std::move(partition);
new_data_part->minmax_idx = std::move(minmax_idx);
new_data_part->relative_path = TMP_PREFIX + part_name;
new_data_part->is_temp = true;
/// The name could be non-unique in case of stale files from previous runs.

View File

@ -5,7 +5,7 @@
namespace DB
{
/// Class contains information about index granularity in rows of MergeTreeDataPart
/// Class contains information about index granularity in rows ofIMergeTreeDataPart
/// Inside it contains vector of partial sums of rows after mark:
/// |-----|---|----|----|
/// | 5 | 8 | 12 | 16 |

View File

@ -8,8 +8,9 @@ MergeTreeIndexReader::MergeTreeIndexReader(
MergeTreeIndexPtr index_, MergeTreeData::DataPartPtr part_, size_t marks_count_, const MarkRanges & all_mark_ranges_)
: index(index_), stream(
part_->getFullPath() + index->getFileName(), ".idx", marks_count_,
all_mark_ranges_, nullptr, false, nullptr,
part_->getFileSizeOrZero(index->getFileName() + ".idx"), 0, DBMS_DEFAULT_BUFFER_SIZE,
all_mark_ranges_,
{ 0, DBMS_DEFAULT_BUFFER_SIZE, false}, nullptr, nullptr,
part_->getFileSizeOrZero(index->getFileName() + ".idx"),
&part_->index_granularity_info,
ReadBufferFromFileBase::ProfileCallback{}, CLOCK_MONOTONIC_COARSE)
{

View File

@ -1,6 +1,6 @@
#include <Storages/MergeTree/MergeTreePartition.h>
#include <Storages/MergeTree/MergeTreeData.h>
#include <Storages/MergeTree/MergeTreeDataPart.h>
#include <Storages/MergeTree/IMergeTreeDataPart.h>
#include <IO/ReadBufferFromFile.h>
#include <IO/HashingWriteBuffer.h>
#include <Common/FieldVisitors.h>

View File

@ -1,5 +1,7 @@
#include <Storages/MergeTree/MergeTreePartsMover.h>
#include <Storages/MergeTree/MergeTreeData.h>
#include <Storages/MergeTree/MergeTreeDataPartFactory.h>
#include <set>
#include <boost/algorithm/string/join.hpp>
@ -76,7 +78,7 @@ bool MergeTreePartsMover::selectPartsForMove(
const AllowedMovingPredicate & can_move,
const std::lock_guard<std::mutex> & /* moving_parts_lock */)
{
MergeTreeData::DataPartsVector data_parts = data->getDataPartsVector();
auto data_parts = data->getDataPartsVector();
if (data_parts.empty())
return false;
@ -108,7 +110,7 @@ bool MergeTreePartsMover::selectPartsForMove(
/// Don't report message to log, because logging is excessive
if (!can_move(part, &reason))
continue;
auto to_insert = need_to_move.find(part->disk);
if (to_insert != need_to_move.end())
to_insert->second.add(part);
@ -143,8 +145,7 @@ MergeTreeData::DataPartPtr MergeTreePartsMover::clonePart(const MergeTreeMoveEnt
moving_part.part->makeCloneOnDiskDetached(moving_part.reserved_space);
MergeTreeData::MutableDataPartPtr cloned_part =
std::make_shared<MergeTreeData::DataPart>(*data, moving_part.reserved_space->getDisk(), moving_part.part->name);
cloned_part->relative_path = "detached/" + moving_part.part->name;
createPart(*data, moving_part.reserved_space->getDisk(), moving_part.part->name, "detached/" + moving_part.part->name);
LOG_TRACE(log, "Part " << moving_part.part->name << " was cloned to " << cloned_part->getFullPath());
cloned_part->loadColumnsChecksumsIndexes(true, true);

View File

@ -3,7 +3,7 @@
#include <functional>
#include <vector>
#include <optional>
#include <Storages/MergeTree/MergeTreeDataPart.h>
#include <Storages/MergeTree/IMergeTreeDataPart.h>
#include <Common/ActionBlocker.h>
#include <Common/DiskSpaceMonitor.h>
@ -15,10 +15,10 @@ namespace DB
/// it have to be moved.
struct MergeTreeMoveEntry
{
std::shared_ptr<const MergeTreeDataPart> part;
std::shared_ptr<const IMergeTreeDataPart> part;
DiskSpace::ReservationPtr reserved_space;
MergeTreeMoveEntry(const std::shared_ptr<const MergeTreeDataPart> & part_, DiskSpace::ReservationPtr reservation_)
MergeTreeMoveEntry(const std::shared_ptr<const IMergeTreeDataPart> & part_, DiskSpace::ReservationPtr reservation_)
: part(part_), reserved_space(std::move(reservation_))
{
}
@ -33,7 +33,7 @@ class MergeTreePartsMover
{
private:
/// Callback tells that part is not participating in background process
using AllowedMovingPredicate = std::function<bool(const std::shared_ptr<const MergeTreeDataPart> &, String * reason)>;
using AllowedMovingPredicate = std::function<bool(const std::shared_ptr<const IMergeTreeDataPart> &, String * reason)>;
public:
MergeTreePartsMover(MergeTreeData * data_)
@ -50,14 +50,14 @@ public:
const std::lock_guard<std::mutex> & moving_parts_lock);
/// Copies part to selected reservation in detached folder. Throws exception if part alredy exists.
std::shared_ptr<const MergeTreeDataPart> clonePart(const MergeTreeMoveEntry & moving_part) const;
std::shared_ptr<const IMergeTreeDataPart> clonePart(const MergeTreeMoveEntry & moving_part) const;
/// Replaces cloned part from detached directory into active data parts set.
/// Replacing part changes state to DeleteOnDestroy and will be removed from disk after destructor of
/// MergeTreeDataPart called. If replacing part doesn't exists or not active (commited) than
///IMergeTreeDataPart called. If replacing part doesn't exists or not active (commited) than
/// cloned part will be removed and loge message will be reported. It may happen in case of concurrent
/// merge or mutation.
void swapClonedPart(const std::shared_ptr<const MergeTreeDataPart> & cloned_parts) const;
void swapClonedPart(const std::shared_ptr<const IMergeTreeDataPart> & cloned_parts) const;
public:
/// Can stop background moves and moves from queries

View File

@ -1,4 +1,4 @@
#include <Storages/MergeTree/MergeTreeReader.h>
#include <Storages/MergeTree/IMergeTreeReader.h>
#include <Columns/FilterDescription.h>
#include <Columns/ColumnsCommon.h>
#include <Columns/ColumnNothing.h>
@ -13,7 +13,7 @@ namespace DB
{
MergeTreeRangeReader::DelayedStream::DelayedStream(
size_t from_mark, MergeTreeReader * merge_tree_reader_)
size_t from_mark, IMergeTreeReader * merge_tree_reader_)
: current_mark(from_mark), current_offset(0), num_delayed_rows(0)
, merge_tree_reader(merge_tree_reader_)
, index_granularity(&(merge_tree_reader->data_part->index_granularity))
@ -108,7 +108,7 @@ size_t MergeTreeRangeReader::DelayedStream::finalize(Block & block)
MergeTreeRangeReader::Stream::Stream(
size_t from_mark, size_t to_mark, MergeTreeReader * merge_tree_reader_)
size_t from_mark, size_t to_mark, IMergeTreeReader * merge_tree_reader_)
: current_mark(from_mark), offset_after_current_mark(0)
, last_mark(to_mark)
, merge_tree_reader(merge_tree_reader_)
@ -406,7 +406,7 @@ void MergeTreeRangeReader::ReadResult::setFilter(const ColumnPtr & new_filter)
MergeTreeRangeReader::MergeTreeRangeReader(
MergeTreeReader * merge_tree_reader_, MergeTreeRangeReader * prev_reader_,
IMergeTreeReader * merge_tree_reader_, MergeTreeRangeReader * prev_reader_,
ExpressionActionsPtr alias_actions_, ExpressionActionsPtr prewhere_actions_,
const String * prewhere_column_name_, const Names * ordered_names_,
bool always_reorder_, bool remove_prewhere_column_, bool last_reader_in_chain_)

View File

@ -11,7 +11,7 @@ template <typename T>
class ColumnVector;
using ColumnUInt8 = ColumnVector<UInt8>;
class MergeTreeReader;
class IMergeTreeReader;
class MergeTreeIndexGranularity;
/// MergeTreeReader iterator which allows sequential reading for arbitrary number of rows between pairs of marks in the same part.
@ -20,7 +20,7 @@ class MergeTreeIndexGranularity;
class MergeTreeRangeReader
{
public:
MergeTreeRangeReader(MergeTreeReader * merge_tree_reader_, MergeTreeRangeReader * prev_reader_,
MergeTreeRangeReader(IMergeTreeReader * merge_tree_reader_, MergeTreeRangeReader * prev_reader_,
ExpressionActionsPtr alias_actions_, ExpressionActionsPtr prewhere_actions_,
const String * prewhere_column_name_, const Names * ordered_names_,
bool always_reorder_, bool remove_prewhere_column_, bool last_reader_in_chain_);
@ -41,7 +41,7 @@ public:
{
public:
DelayedStream() = default;
DelayedStream(size_t from_mark, MergeTreeReader * merge_tree_reader);
DelayedStream(size_t from_mark, IMergeTreeReader * merge_tree_reader);
/// Read @num_rows rows from @from_mark starting from @offset row
/// Returns the number of rows added to block.
@ -62,7 +62,7 @@ public:
size_t num_delayed_rows = 0;
/// Actual reader of data from disk
MergeTreeReader * merge_tree_reader = nullptr;
IMergeTreeReader * merge_tree_reader = nullptr;
const MergeTreeIndexGranularity * index_granularity = nullptr;
bool continue_reading = false;
bool is_finished = true;
@ -78,7 +78,7 @@ public:
{
public:
Stream() = default;
Stream(size_t from_mark, size_t to_mark, MergeTreeReader * merge_tree_reader);
Stream(size_t from_mark, size_t to_mark, IMergeTreeReader * merge_tree_reader);
/// Returns the number of rows added to block.
size_t read(Block & block, size_t num_rows, bool skip_remaining_rows_in_current_granule);
@ -103,7 +103,7 @@ public:
size_t last_mark = 0;
MergeTreeReader * merge_tree_reader = nullptr;
IMergeTreeReader * merge_tree_reader = nullptr;
const MergeTreeIndexGranularity * index_granularity = nullptr;
size_t current_mark_index_granularity = 0;
@ -191,7 +191,7 @@ private:
void executePrewhereActionsAndFilterColumns(ReadResult & result);
void filterBlock(Block & block, const IColumn::Filter & filter) const;
MergeTreeReader * merge_tree_reader = nullptr;
IMergeTreeReader * merge_tree_reader = nullptr;
const MergeTreeIndexGranularity * index_granularity = nullptr;
MergeTreeRangeReader * prev_reader = nullptr; /// If not nullptr, read from prev_reader firstly.

View File

@ -33,7 +33,7 @@ MergeTreeReadPool::MergeTreeReadPool(
for (auto & part_ranges : parts_ranges)
std::reverse(std::begin(part_ranges.ranges), std::end(part_ranges.ranges));
/// parts don't contain duplicate MergeTreeDataPart's.
/// parts don't contain duplicateIMergeTreeDataPart's.
const auto per_part_sum_marks = fillPerPartInfo(parts_, check_columns_);
fillPerThreadInfo(threads_, sum_marks_, per_part_sum_marks, parts_, min_marks_for_concurrent_read_);
}
@ -129,7 +129,7 @@ MergeTreeReadTaskPtr MergeTreeReadPool::getTask(const size_t min_marks_to_read,
prewhere_info && prewhere_info->remove_prewhere_column, per_part_should_reorder[part_idx], std::move(curr_task_size_predictor));
}
MarkRanges MergeTreeReadPool::getRestMarks(const MergeTreeDataPart & part, const MarkRange & from) const
MarkRanges MergeTreeReadPool::getRestMarks(const IMergeTreeDataPart & part, const MarkRange & from) const
{
MarkRanges all_part_ranges;

View File

@ -81,7 +81,7 @@ public:
void profileFeedback(const ReadBufferFromFileBase::ProfileInfo info);
/// This method tells which mark ranges we have to read if we start from @from mark range
MarkRanges getRestMarks(const MergeTreeDataPart & part, const MarkRange & from) const;
MarkRanges getRestMarks(const IMergeTreeDataPart & part, const MarkRange & from) const;
Block getHeader() const;

View File

@ -0,0 +1,42 @@
#pragma once
#include <Core/NamesAndTypes.h>
#include <Storages/MergeTree/IMergeTreeReader.h>
#include <port/clock.h>
namespace DB
{
/// Reads the data between pairs of marks in the same part. When reading consecutive ranges, avoids unnecessary seeks.
/// When ranges are almost consecutive, seeks are fast because they are performed inside the buffer.
/// Avoids loading the marks file if it is not needed (e.g. when reading the whole part).
class MergeTreeReaderCompact : public IMergeTreeReader
{
public:
MergeTreeReaderCompact(const MergeTreeData::DataPartPtr & data_part_,
const NamesAndTypesList & columns_,
UncompressedCache * uncompressed_cache_,
MarkCache * mark_cache_,
const MarkRanges & mark_ranges_,
const ReaderSettings & settings_,
const ValueSizeMap & avg_value_size_hints_ = ValueSizeMap{},
const ReadBufferFromFileBase::ProfileCallback & profile_callback_ = ReadBufferFromFileBase::ProfileCallback{},
clockid_t clock_type_ = CLOCK_MONOTONIC_COARSE);
/// Return the number of rows has been read or zero if there is no columns to read.
/// If continue_reading is true, continue reading from last state, otherwise seek to from_mark
size_t readRows(size_t from_mark, bool continue_reading, size_t max_rows_to_read, Block & res) override;
private:
ReadBuffer * data_buffer;
using Offsets = std::vector<MarkCache::MappedPtr>;
Offsets offsets;
/// Columns that are read.
friend class MergeTreeRangeReader::DelayedStream;
};
}

View File

@ -0,0 +1,13 @@
#pragma once
#include <cstddef>
namespace DB
{
struct ReaderSettings
{
size_t min_bytes_to_use_direct_io = 0;
size_t max_read_buffer_size = 0;
bool save_marks_in_cache = false;
};
}

View File

@ -17,13 +17,13 @@ namespace ErrorCodes
MergeTreeReaderStream::MergeTreeReaderStream(
const String & path_prefix_, const String & data_file_extension_, size_t marks_count_,
const MarkRanges & all_mark_ranges,
MarkCache * mark_cache_, bool save_marks_in_cache_,
UncompressedCache * uncompressed_cache,
size_t file_size, size_t aio_threshold, size_t max_read_buffer_size,
const ReaderSettings & settings,
MarkCache * mark_cache_,
UncompressedCache * uncompressed_cache, size_t file_size,
const MergeTreeIndexGranularityInfo * index_granularity_info_,
const ReadBufferFromFileBase::ProfileCallback & profile_callback, clockid_t clock_type)
: path_prefix(path_prefix_), data_file_extension(data_file_extension_), marks_count(marks_count_)
, mark_cache(mark_cache_), save_marks_in_cache(save_marks_in_cache_)
, mark_cache(mark_cache_), save_marks_in_cache(settings.save_marks_in_cache)
, index_granularity_info(index_granularity_info_)
{
/// Compute the size of the buffer.
@ -71,15 +71,15 @@ MergeTreeReaderStream::MergeTreeReaderStream(
/// Avoid empty buffer. May happen while reading dictionary for DataTypeLowCardinality.
/// For example: part has single dictionary and all marks point to the same position.
if (max_mark_range_bytes == 0)
max_mark_range_bytes = max_read_buffer_size;
max_mark_range_bytes = settings.max_read_buffer_size;
size_t buffer_size = std::min(max_read_buffer_size, max_mark_range_bytes);
size_t buffer_size = std::min(settings.max_read_buffer_size, max_mark_range_bytes);
/// Initialize the objects that shall be used to perform read operations.
if (uncompressed_cache)
{
auto buffer = std::make_unique<CachedCompressedReadBuffer>(
path_prefix + data_file_extension, uncompressed_cache, sum_mark_range_bytes, aio_threshold, buffer_size);
path_prefix + data_file_extension, uncompressed_cache, sum_mark_range_bytes, settings.min_bytes_to_use_direct_io, buffer_size);
if (profile_callback)
buffer->setProfileCallback(profile_callback, clock_type);
@ -90,7 +90,7 @@ MergeTreeReaderStream::MergeTreeReaderStream(
else
{
auto buffer = std::make_unique<CompressedReadBufferFromFile>(
path_prefix + data_file_extension, sum_mark_range_bytes, aio_threshold, buffer_size);
path_prefix + data_file_extension, sum_mark_range_bytes, settings.min_bytes_to_use_direct_io, buffer_size);
if (profile_callback)
buffer->setProfileCallback(profile_callback, clock_type);

View File

@ -17,10 +17,9 @@ public:
MergeTreeReaderStream(
const String & path_prefix_, const String & data_file_extension_, size_t marks_count_,
const MarkRanges & all_mark_ranges,
MarkCache * mark_cache, bool save_marks_in_cache,
UncompressedCache * uncompressed_cache,
size_t file_size, size_t aio_threshold, size_t max_read_buffer_size,
const MergeTreeIndexGranularityInfo * index_granularity_info_,
const ReaderSettings & settings_,
MarkCache * mark_cache, UncompressedCache * uncompressed_cache,
size_t file_size, const MergeTreeIndexGranularityInfo * index_granularity_info_,
const ReadBufferFromFileBase::ProfileCallback & profile_callback, clockid_t clock_type);
void seekToMark(size_t index);

View File

@ -4,7 +4,7 @@
#include <Compression/CachedCompressedReadBuffer.h>
#include <Columns/ColumnArray.h>
#include <Interpreters/evaluateMissingDefaults.h>
#include <Storages/MergeTree/MergeTreeReader.h>
#include <Storages/MergeTree/MergeTreeReaderWide.h>
#include <Common/typeid_cast.h>
#include <Poco/File.h>
@ -27,20 +27,14 @@ namespace ErrorCodes
extern const int ARGUMENT_OUT_OF_BOUND;
}
MergeTreeReader::~MergeTreeReader() = default;
MergeTreeReader::MergeTreeReader(const String & path_,
const MergeTreeData::DataPartPtr & data_part_, const NamesAndTypesList & columns_,
UncompressedCache * uncompressed_cache_, MarkCache * mark_cache_, bool save_marks_in_cache_,
const MergeTreeData & storage_, const MarkRanges & all_mark_ranges_,
size_t aio_threshold_, size_t max_read_buffer_size_, const ValueSizeMap & avg_value_size_hints_,
MergeTreeReaderWide::MergeTreeReaderWide(const MergeTreeData::DataPartPtr & data_part_,
const NamesAndTypesList & columns_, UncompressedCache * uncompressed_cache_, MarkCache * mark_cache_,
const MarkRanges & mark_ranges_, const ReaderSettings & settings_, const ValueSizeMap & avg_value_size_hints_,
const ReadBufferFromFileBase::ProfileCallback & profile_callback_,
clockid_t clock_type_)
: data_part(data_part_), avg_value_size_hints(avg_value_size_hints_), path(path_), columns(columns_)
, uncompressed_cache(uncompressed_cache_), mark_cache(mark_cache_), save_marks_in_cache(save_marks_in_cache_), storage(storage_)
, all_mark_ranges(all_mark_ranges_), aio_threshold(aio_threshold_), max_read_buffer_size(max_read_buffer_size_)
: IMergeTreeReader(data_part_, columns_
, uncompressed_cache_, mark_cache_, mark_ranges_
, settings_ , avg_value_size_hints_)
{
try
{
@ -54,14 +48,7 @@ MergeTreeReader::MergeTreeReader(const String & path_,
}
}
const MergeTreeReader::ValueSizeMap & MergeTreeReader::getAvgValueSizeHints() const
{
return avg_value_size_hints;
}
size_t MergeTreeReader::readRows(size_t from_mark, bool continue_reading, size_t max_rows_to_read, Block & res)
size_t MergeTreeReaderWide::readRows(size_t from_mark, bool continue_reading, size_t max_rows_to_read, Block & res)
{
size_t read_rows = 0;
try
@ -150,7 +137,7 @@ size_t MergeTreeReader::readRows(size_t from_mark, bool continue_reading, size_t
return read_rows;
}
void MergeTreeReader::addStreams(const String & name, const IDataType & type,
void MergeTreeReaderWide::addStreams(const String & name, const IDataType & type,
const ReadBufferFromFileBase::ProfileCallback & profile_callback, clockid_t clock_type)
{
IDataType::StreamCallback callback = [&] (const IDataType::SubstreamPath & substream_path)
@ -170,9 +157,8 @@ void MergeTreeReader::addStreams(const String & name, const IDataType & type,
streams.emplace(stream_name, std::make_unique<MergeTreeReaderStream>(
path + stream_name, DATA_FILE_EXTENSION, data_part->getMarksCount(),
all_mark_ranges, mark_cache, save_marks_in_cache,
all_mark_ranges, settings, mark_cache,
uncompressed_cache, data_part->getFileSizeOrZero(stream_name + DATA_FILE_EXTENSION),
aio_threshold, max_read_buffer_size,
&data_part->index_granularity_info,
profile_callback, clock_type));
};
@ -182,7 +168,7 @@ void MergeTreeReader::addStreams(const String & name, const IDataType & type,
}
void MergeTreeReader::readData(
void MergeTreeReaderWide::readData(
const String & name, const IDataType & type, IColumn & column,
size_t from_mark, bool continue_reading, size_t max_rows_to_read,
bool with_offsets)
@ -232,147 +218,4 @@ void MergeTreeReader::readData(
IDataType::updateAvgValueSizeHint(column, avg_value_size_hint);
}
static bool arrayHasNoElementsRead(const IColumn & column)
{
const ColumnArray * column_array = typeid_cast<const ColumnArray *>(&column);
if (!column_array)
return false;
size_t size = column_array->size();
if (!size)
return false;
size_t data_size = column_array->getData().size();
if (data_size)
return false;
size_t last_offset = column_array->getOffsets()[size - 1];
return last_offset != 0;
}
void MergeTreeReader::fillMissingColumns(Block & res, bool & should_reorder, bool & should_evaluate_missing_defaults, size_t num_rows)
{
try
{
/// For a missing column of a nested data structure we must create not a column of empty
/// arrays, but a column of arrays of correct length.
/// First, collect offset columns for all arrays in the block.
OffsetColumns offset_columns;
for (size_t i = 0; i < res.columns(); ++i)
{
const ColumnWithTypeAndName & column = res.safeGetByPosition(i);
if (const ColumnArray * array = typeid_cast<const ColumnArray *>(column.column.get()))
{
String offsets_name = Nested::extractTableName(column.name);
auto & offsets_column = offset_columns[offsets_name];
/// If for some reason multiple offsets columns are present for the same nested data structure,
/// choose the one that is not empty.
if (!offsets_column || offsets_column->empty())
offsets_column = array->getOffsetsPtr();
}
}
should_evaluate_missing_defaults = false;
should_reorder = false;
/// insert default values only for columns without default expressions
for (const auto & requested_column : columns)
{
bool has_column = res.has(requested_column.name);
if (has_column)
{
const auto & col = *res.getByName(requested_column.name).column;
if (arrayHasNoElementsRead(col))
{
res.erase(requested_column.name);
has_column = false;
}
}
if (!has_column)
{
should_reorder = true;
if (storage.getColumns().hasDefault(requested_column.name))
{
should_evaluate_missing_defaults = true;
continue;
}
ColumnWithTypeAndName column_to_add;
column_to_add.name = requested_column.name;
column_to_add.type = requested_column.type;
String offsets_name = Nested::extractTableName(column_to_add.name);
if (offset_columns.count(offsets_name))
{
ColumnPtr offsets_column = offset_columns[offsets_name];
DataTypePtr nested_type = typeid_cast<const DataTypeArray &>(*column_to_add.type).getNestedType();
size_t nested_rows = typeid_cast<const ColumnUInt64 &>(*offsets_column).getData().back();
ColumnPtr nested_column = nested_type->createColumnConstWithDefaultValue(nested_rows)->convertToFullColumnIfConst();
column_to_add.column = ColumnArray::create(nested_column, offsets_column);
}
else
{
/// We must turn a constant column into a full column because the interpreter could infer that it is constant everywhere
/// but in some blocks (from other parts) it can be a full column.
column_to_add.column = column_to_add.type->createColumnConstWithDefaultValue(num_rows)->convertToFullColumnIfConst();
}
res.insert(std::move(column_to_add));
}
}
}
catch (Exception & e)
{
/// Better diagnostics.
e.addMessage("(while reading from part " + path + ")");
throw;
}
}
void MergeTreeReader::reorderColumns(Block & res, const Names & ordered_names, const String * filter_name)
{
try
{
Block ordered_block;
for (const auto & name : ordered_names)
if (res.has(name))
ordered_block.insert(res.getByName(name));
if (filter_name && !ordered_block.has(*filter_name) && res.has(*filter_name))
ordered_block.insert(res.getByName(*filter_name));
std::swap(res, ordered_block);
}
catch (Exception & e)
{
/// Better diagnostics.
e.addMessage("(while reading from part " + path + ")");
throw;
}
}
void MergeTreeReader::evaluateMissingDefaults(Block & res)
{
try
{
DB::evaluateMissingDefaults(res, columns, storage.getColumns().getDefaults(), storage.global_context);
}
catch (Exception & e)
{
/// Better diagnostics.
e.addMessage("(while reading from part " + path + ")");
throw;
}
}
}

View File

@ -0,0 +1,49 @@
#pragma once
#include <Core/NamesAndTypes.h>
#include <Storages/MergeTree/IMergeTreeReader.h>
#include <port/clock.h>
namespace DB
{
/// Reads the data between pairs of marks in the same part. When reading consecutive ranges, avoids unnecessary seeks.
/// When ranges are almost consecutive, seeks are fast because they are performed inside the buffer.
/// Avoids loading the marks file if it is not needed (e.g. when reading the whole part).
class MergeTreeReaderWide : public IMergeTreeReader
{
public:
MergeTreeReaderWide(const MergeTreeData::DataPartPtr & data_part_,
const NamesAndTypesList & columns_,
UncompressedCache * uncompressed_cache_,
MarkCache * mark_cache_,
const MarkRanges & mark_ranges_,
const ReaderSettings & settings_,
const ValueSizeMap & avg_value_size_hints_ = ValueSizeMap{},
const ReadBufferFromFileBase::ProfileCallback & profile_callback_ = ReadBufferFromFileBase::ProfileCallback{},
clockid_t clock_type_ = CLOCK_MONOTONIC_COARSE);
/// Return the number of rows has been read or zero if there is no columns to read.
/// If continue_reading is true, continue reading from last state, otherwise seek to from_mark
size_t readRows(size_t from_mark, bool continue_reading, size_t max_rows_to_read, Block & res) override;
private:
using FileStreams = std::map<std::string, std::unique_ptr<MergeTreeReaderStream>>;
FileStreams streams;
/// Columns that are read.
void addStreams(const String & name, const IDataType & type,
const ReadBufferFromFileBase::ProfileCallback & profile_callback, clockid_t clock_type);
void readData(
const String & name, const IDataType & type, IColumn & column,
size_t from_mark, bool continue_reading, size_t max_rows_to_read,
bool read_offsets = true);
friend class MergeTreeRangeReader::DelayedStream;
};
}

View File

@ -1,6 +1,6 @@
#include <Storages/MergeTree/MergeTreeReverseSelectBlockInputStream.h>
#include <Storages/MergeTree/MergeTreeBaseSelectBlockInputStream.h>
#include <Storages/MergeTree/MergeTreeReader.h>
#include <Storages/MergeTree/IMergeTreeReader.h>
#include <Core/Defines.h>
@ -24,16 +24,14 @@ MergeTreeReverseSelectBlockInputStream::MergeTreeReverseSelectBlockInputStream(
bool use_uncompressed_cache_,
const PrewhereInfoPtr & prewhere_info_,
bool check_columns,
size_t min_bytes_to_use_direct_io_,
size_t max_read_buffer_size_,
bool save_marks_in_cache_,
const ReaderSettings & reader_settings_,
const Names & virt_column_names_,
size_t part_index_in_query_,
bool quiet)
:
MergeTreeBaseSelectBlockInputStream{storage_, prewhere_info_, max_block_size_rows_,
preferred_block_size_bytes_, preferred_max_column_in_block_size_bytes_, min_bytes_to_use_direct_io_,
max_read_buffer_size_, use_uncompressed_cache_, save_marks_in_cache_, virt_column_names_},
preferred_block_size_bytes_, preferred_max_column_in_block_size_bytes_, reader_settings_,
use_uncompressed_cache_, virt_column_names_},
required_columns{required_columns_},
data_part{owned_data_part_},
part_columns_lock(data_part->columns_lock),
@ -89,19 +87,14 @@ MergeTreeReverseSelectBlockInputStream::MergeTreeReverseSelectBlockInputStream(
owned_mark_cache = storage.global_context.getMarkCache();
reader = std::make_unique<MergeTreeReader>(
path, data_part, task_columns.columns, owned_uncompressed_cache.get(),
owned_mark_cache.get(), save_marks_in_cache, storage,
all_mark_ranges, min_bytes_to_use_direct_io, max_read_buffer_size);
reader = data_part->getReader(task_columns.columns, all_mark_ranges,
owned_uncompressed_cache.get(), owned_mark_cache.get(), reader_settings);
if (prewhere_info)
pre_reader = std::make_unique<MergeTreeReader>(
path, data_part, task_columns.pre_columns, owned_uncompressed_cache.get(),
owned_mark_cache.get(), save_marks_in_cache, storage,
all_mark_ranges, min_bytes_to_use_direct_io, max_read_buffer_size);
pre_reader = data_part->getReader(task_columns.pre_columns, all_mark_ranges,
owned_uncompressed_cache.get(), owned_mark_cache.get(), reader_settings);
}
Block MergeTreeReverseSelectBlockInputStream::getHeader() const
{
return header;

View File

@ -27,9 +27,7 @@ public:
bool use_uncompressed_cache,
const PrewhereInfoPtr & prewhere_info,
bool check_columns,
size_t min_bytes_to_use_direct_io,
size_t max_read_buffer_size,
bool save_marks_in_cache,
const ReaderSettings & reader_settings,
const Names & virt_column_names = {},
size_t part_index_in_query = 0,
bool quiet = false);

View File

@ -1,6 +1,6 @@
#include <Storages/MergeTree/MergeTreeSelectBlockInputStream.h>
#include <Storages/MergeTree/IMergeTreeReader.h>
#include <Storages/MergeTree/MergeTreeBaseSelectBlockInputStream.h>
#include <Storages/MergeTree/MergeTreeReader.h>
#include <Core/Defines.h>
@ -24,16 +24,14 @@ MergeTreeSelectBlockInputStream::MergeTreeSelectBlockInputStream(
bool use_uncompressed_cache_,
const PrewhereInfoPtr & prewhere_info_,
bool check_columns_,
size_t min_bytes_to_use_direct_io_,
size_t max_read_buffer_size_,
bool save_marks_in_cache_,
const ReaderSettings & reader_settings_,
const Names & virt_column_names_,
size_t part_index_in_query_,
bool quiet)
:
MergeTreeBaseSelectBlockInputStream{storage_, prewhere_info_, max_block_size_rows_,
preferred_block_size_bytes_, preferred_max_column_in_block_size_bytes_, min_bytes_to_use_direct_io_,
max_read_buffer_size_, use_uncompressed_cache_, save_marks_in_cache_, virt_column_names_},
preferred_block_size_bytes_, preferred_max_column_in_block_size_bytes_,
reader_settings_, use_uncompressed_cache_, virt_column_names_},
required_columns{required_columns_},
data_part{owned_data_part_},
part_columns_lock(data_part->columns_lock),
@ -126,16 +124,12 @@ try
owned_mark_cache = storage.global_context.getMarkCache();
reader = std::make_unique<MergeTreeReader>(
path, data_part, task_columns.columns, owned_uncompressed_cache.get(),
owned_mark_cache.get(), save_marks_in_cache, storage,
all_mark_ranges, min_bytes_to_use_direct_io, max_read_buffer_size);
reader = data_part->getReader(task_columns.columns, all_mark_ranges,
owned_uncompressed_cache.get(), owned_mark_cache.get(), reader_settings);
if (prewhere_info)
pre_reader = std::make_unique<MergeTreeReader>(
path, data_part, task_columns.pre_columns, owned_uncompressed_cache.get(),
owned_mark_cache.get(), save_marks_in_cache, storage,
all_mark_ranges, min_bytes_to_use_direct_io, max_read_buffer_size);
pre_reader = data_part->getReader(task_columns.pre_columns, all_mark_ranges,
owned_uncompressed_cache.get(), owned_mark_cache.get(), reader_settings);
}
return true;

View File

@ -27,9 +27,7 @@ public:
bool use_uncompressed_cache,
const PrewhereInfoPtr & prewhere_info,
bool check_columns,
size_t min_bytes_to_use_direct_io,
size_t max_read_buffer_size,
bool save_marks_in_cache,
const ReaderSettings & reader_settings,
const Names & virt_column_names = {},
size_t part_index_in_query = 0,
bool quiet = false);

View File

@ -51,13 +51,17 @@ MergeTreeSequentialBlockInputStream::MergeTreeSequentialBlockInputStream(
columns_for_reader = data_part->columns.addTypes(columns_to_read);
}
reader = std::make_unique<MergeTreeReader>(
data_part->getFullPath(), data_part, columns_for_reader, /* uncompressed_cache = */ nullptr,
mark_cache.get(), /* save_marks_in_cache = */ false, storage,
ReaderSettings reader_settings =
{
/// This is hack
.min_bytes_to_use_direct_io = read_with_direct_io ? 1UL : std::numeric_limits<size_t>::max(),
.max_read_buffer_size = DBMS_DEFAULT_BUFFER_SIZE,
.save_marks_in_cache = false
};
reader = data_part->getReader(columns_for_reader,
MarkRanges{MarkRange(0, data_part->getMarksCount())},
/* bytes to use AIO (this is hack) */
read_with_direct_io ? 1UL : std::numeric_limits<size_t>::max(),
DBMS_DEFAULT_BUFFER_SIZE);
/* uncompressed_cache = */ nullptr, mark_cache.get(), reader_settings);
}

View File

@ -1,7 +1,7 @@
#pragma once
#include <DataStreams/IBlockInputStream.h>
#include <Storages/MergeTree/MergeTreeData.h>
#include <Storages/MergeTree/MergeTreeReader.h>
#include <Storages/MergeTree/IMergeTreeReader.h>
#include <Storages/MergeTree/MarkRange.h>
#include <memory>
@ -58,7 +58,7 @@ private:
Logger * log = &Logger::get("MergeTreeSequentialBlockInputStream");
std::shared_ptr<MarkCache> mark_cache;
using MergeTreeReaderPtr = std::unique_ptr<MergeTreeReader>;
using MergeTreeReaderPtr = std::unique_ptr<IMergeTreeReader>;
MergeTreeReaderPtr reader;
/// current mark at which we stop reading

View File

@ -1,4 +1,4 @@
#include <Storages/MergeTree/MergeTreeReader.h>
#include <Storages/MergeTree/IMergeTreeReader.h>
#include <Storages/MergeTree/MergeTreeReadPool.h>
#include <Storages/MergeTree/MergeTreeThreadSelectBlockInputStream.h>
@ -17,12 +17,12 @@ MergeTreeThreadSelectBlockInputStream::MergeTreeThreadSelectBlockInputStream(
const MergeTreeData & storage_,
const bool use_uncompressed_cache_,
const PrewhereInfoPtr & prewhere_info_,
const Settings & settings,
const ReaderSettings & reader_settings_,
const Names & virt_column_names_)
:
MergeTreeBaseSelectBlockInputStream{storage_, prewhere_info_, max_block_size_rows_,
preferred_block_size_bytes_, preferred_max_column_in_block_size_bytes_, settings.min_bytes_to_use_direct_io,
settings.max_read_buffer_size, use_uncompressed_cache_, true, virt_column_names_},
preferred_block_size_bytes_, preferred_max_column_in_block_size_bytes_,
reader_settings_, use_uncompressed_cache_, virt_column_names_},
thread{thread_},
pool{pool_}
{
@ -80,15 +80,14 @@ bool MergeTreeThreadSelectBlockInputStream::getNewTask()
owned_uncompressed_cache = storage.global_context.getUncompressedCache();
owned_mark_cache = storage.global_context.getMarkCache();
reader = std::make_unique<MergeTreeReader>(
path, task->data_part, task->columns, owned_uncompressed_cache.get(), owned_mark_cache.get(), save_marks_in_cache,
storage, rest_mark_ranges, min_bytes_to_use_direct_io, max_read_buffer_size, MergeTreeReader::ValueSizeMap{}, profile_callback);
reader = task->data_part->getReader(task->columns, rest_mark_ranges,
owned_uncompressed_cache.get(), owned_mark_cache.get(), reader_settings,
IMergeTreeReader::ValueSizeMap{}, profile_callback);
if (prewhere_info)
pre_reader = std::make_unique<MergeTreeReader>(
path, task->data_part, task->pre_columns, owned_uncompressed_cache.get(), owned_mark_cache.get(), save_marks_in_cache,
storage, rest_mark_ranges, min_bytes_to_use_direct_io,
max_read_buffer_size, MergeTreeReader::ValueSizeMap{}, profile_callback);
pre_reader = task->data_part->getReader(task->pre_columns, rest_mark_ranges,
owned_uncompressed_cache.get(), owned_mark_cache.get(), reader_settings,
IMergeTreeReader::ValueSizeMap{}, profile_callback);
}
else
{
@ -97,18 +96,17 @@ bool MergeTreeThreadSelectBlockInputStream::getNewTask()
{
auto rest_mark_ranges = pool->getRestMarks(*task->data_part, task->mark_ranges[0]);
/// retain avg_value_size_hints
reader = std::make_unique<MergeTreeReader>(
path, task->data_part, task->columns, owned_uncompressed_cache.get(), owned_mark_cache.get(), save_marks_in_cache,
storage, rest_mark_ranges, min_bytes_to_use_direct_io, max_read_buffer_size,
reader = task->data_part->getReader(task->columns, rest_mark_ranges,
owned_uncompressed_cache.get(), owned_mark_cache.get(), reader_settings,
reader->getAvgValueSizeHints(), profile_callback);
if (prewhere_info)
pre_reader = std::make_unique<MergeTreeReader>(
path, task->data_part, task->pre_columns, owned_uncompressed_cache.get(), owned_mark_cache.get(), save_marks_in_cache,
storage, rest_mark_ranges, min_bytes_to_use_direct_io,
max_read_buffer_size, pre_reader->getAvgValueSizeHints(), profile_callback);
pre_reader = task->data_part->getReader(task->pre_columns, rest_mark_ranges,
owned_uncompressed_cache.get(), owned_mark_cache.get(), reader_settings,
reader->getAvgValueSizeHints(), profile_callback);
}
}
last_readed_part_path = path;
return true;

View File

@ -24,7 +24,7 @@ public:
const MergeTreeData & storage_,
const bool use_uncompressed_cache_,
const PrewhereInfoPtr & prewhere_info_,
const Settings & settings_,
const ReaderSettings & reader_settings_,
const Names & virt_column_names_);
String getName() const override { return "MergeTreeThread"; }

View File

@ -0,0 +1,13 @@
#include <Core/Block.h>
namespace DB
{
class MergeTreeWriter
{
};
using MergeTreeWriterPtr = std::shared_ptr<MergeTreeWriter>;
}

View File

@ -2,7 +2,7 @@
#include <Storages/StorageReplicatedMergeTree.h>
#include <IO/ReadHelpers.h>
#include <IO/WriteHelpers.h>
#include <Storages/MergeTree/MergeTreeDataPart.h>
#include <Storages/MergeTree/IMergeTreeDataPart.h>
#include <Storages/MergeTree/MergeTreeDataMergerMutator.h>
#include <Storages/MergeTree/ReplicatedMergeTreeQuorumEntry.h>
#include <Common/StringUtils/StringUtils.h>

View File

@ -8,7 +8,7 @@
#include <IO/ReadHelpers.h>
#include <IO/Operators.h>
#include <Storages/MergeTree/MergeTreeDataPart.h>
#include <Storages/MergeTree/IMergeTreeDataPart.h>
namespace DB
{

View File

@ -1,7 +1,7 @@
#pragma once
#include <Storages/IStorage.h>
#include <Storages/MergeTree/MergeTreeDataPart.h>
#include <Storages/MergeTree/IMergeTreeDataPart.h>
#include <Storages/MergeTree/MergeTreeDataSelectExecutor.h>
#include <Core/Defines.h>
@ -12,11 +12,11 @@ namespace DB
{
/// A Storage that allows reading from a single MergeTree data part.
class StorageFromMergeTreeDataPart : public ext::shared_ptr_helper<StorageFromMergeTreeDataPart>, public IStorage
class StorageFroIMergeTreeDataPart : public ext::shared_ptr_helper<StorageFroIMergeTreeDataPart>, public IStorage
{
friend struct ext::shared_ptr_helper<StorageFromMergeTreeDataPart>;
friend struct ext::shared_ptr_helper<StorageFroIMergeTreeDataPart>;
public:
String getName() const override { return "FromMergeTreeDataPart"; }
String getName() const override { return "FroIMergeTreeDataPart"; }
String getTableName() const override { return part->storage.getTableName() + " (part " + part->name + ")"; }
String getDatabaseName() const override { return part->storage.getDatabaseName(); }
@ -40,7 +40,7 @@ public:
}
protected:
StorageFromMergeTreeDataPart(const MergeTreeData::DataPartPtr & part_)
StorageFroIMergeTreeDataPart(const MergeTreeData::DataPartPtr & part_)
: IStorage(part_->storage.getVirtuals()), part(part_)
{
setColumns(part_->storage.getColumns());

View File

@ -14,7 +14,7 @@
#include <Storages/PartitionCommands.h>
#include <Storages/ColumnsDescription.h>
#include <Storages/StorageReplicatedMergeTree.h>
#include <Storages/MergeTree/MergeTreeDataPart.h>
#include <Storages/MergeTree/IMergeTreeDataPart.h>
#include <Storages/MergeTree/MergeList.h>
#include <Storages/MergeTree/ReplicatedMergeTreeTableMetadata.h>
#include <Storages/MergeTree/ReplicatedMergeTreeBlockOutputStream.h>
@ -2332,7 +2332,7 @@ bool StorageReplicatedMergeTree::createLogEntryToMergeParts(
}
bool StorageReplicatedMergeTree::createLogEntryToMutatePart(const MergeTreeDataPart & part, Int64 mutation_version)
bool StorageReplicatedMergeTree::createLogEntryToMutatePart(const IMergeTreeDataPart & part, Int64 mutation_version)
{
auto zookeeper = getZooKeeper();
@ -2691,7 +2691,7 @@ bool StorageReplicatedMergeTree::fetchPart(const String & part_name, const Strin
{
const auto part_info = MergeTreePartInfo::fromPartName(part_name, format_version);
if (auto part = getPartIfExists(part_info, {MergeTreeDataPart::State::Outdated, MergeTreeDataPart::State::Deleting}))
if (auto part = getPartIfExists(part_info, {IMergeTreeDataPart::State::Outdated, IMergeTreeDataPart::State::Deleting}))
{
LOG_DEBUG(log, "Part " << part->name << " should be deleted after previous attempt before fetch");
/// Force immediate parts cleanup to delete the part that was left from the previous fetch attempt.

View File

@ -434,7 +434,7 @@ private:
bool force_ttl,
ReplicatedMergeTreeLogEntryData * out_log_entry = nullptr);
bool createLogEntryToMutatePart(const MergeTreeDataPart & part, Int64 mutation_version);
bool createLogEntryToMutatePart(const IMergeTreeDataPart & part, Int64 mutation_version);
/// Exchange parts.

View File

@ -58,7 +58,7 @@ StorageSystemParts::StorageSystemParts(const std::string & name_)
void StorageSystemParts::processNextStorage(MutableColumns & columns_, const StoragesInfo & info, bool has_state_column)
{
using State = MergeTreeDataPart::State;
using State =IMergeTreeDataPart::State;
MergeTreeData::DataPartStateVector all_parts_state;
MergeTreeData::DataPartsVector all_parts;
@ -117,7 +117,7 @@ void StorageSystemParts::processNextStorage(MutableColumns & columns_, const Sto
MinimalisticDataPartChecksums helper;
{
/// TODO: MergeTreeDataPart structure is too error-prone.
/// TODO:IMergeTreeDataPart structure is too error-prone.
std::shared_lock<std::shared_mutex> lock(part->columns_lock);
helper.computeTotalChecksums(part->checksums);
}

View File

@ -98,7 +98,7 @@ void StorageSystemPartsColumns::processNextStorage(MutableColumns & columns_, co
auto index_size_in_bytes = part->getIndexSizeInBytes();
auto index_size_in_allocated_bytes = part->getIndexSizeInAllocatedBytes();
using State = MergeTreeDataPart::State;
using State =IMergeTreeDataPart::State;
for (const auto & column : part->columns)