Removed huge chunk of bad code

This commit is contained in:
Alexey Milovidov 2019-05-03 05:00:57 +03:00
parent 41a32d8693
commit a6ca9f266f
23 changed files with 426 additions and 572 deletions

View File

@ -191,12 +191,12 @@ void AsynchronousMetrics::update()
"Cannot get replica delay for table: " + backQuoteIfNeed(db.first) + "." + backQuoteIfNeed(iterator->name()));
}
calculateMax(max_part_count_for_partition, table_replicated_merge_tree->getData().getMaxPartsCountForPartition());
calculateMax(max_part_count_for_partition, table_replicated_merge_tree->getMaxPartsCountForPartition());
}
if (table_merge_tree)
{
calculateMax(max_part_count_for_partition, table_merge_tree->getData().getMaxPartsCountForPartition());
calculateMax(max_part_count_for_partition, table_merge_tree->getMaxPartsCountForPartition());
}
}
}

View File

@ -30,6 +30,8 @@
#include <Parsers/ASTSelectWithUnionQuery.h>
#include <Parsers/ASTTablesInSelectQuery.h>
#include <Parsers/ParserSelectQuery.h>
#include <Parsers/ExpressionListParsers.h>
#include <Parsers/parseQuery.h>
#include <Interpreters/InterpreterSelectQuery.h>
#include <Interpreters/InterpreterSelectWithUnionQuery.h>
@ -43,8 +45,7 @@
#include <Storages/MergeTree/MergeTreeWhereOptimizer.h>
#include <Storages/IStorage.h>
#include <Storages/StorageMergeTree.h>
#include <Storages/StorageReplicatedMergeTree.h>
#include <Storages/MergeTree/MergeTreeData.h>
#include <TableFunctions/ITableFunction.h>
#include <TableFunctions/TableFunctionFactory.h>
@ -590,13 +591,11 @@ void InterpreterSelectQuery::executeImpl(Pipeline & pipeline, const BlockInputSt
/// Try transferring some condition from WHERE to PREWHERE if enabled and viable
if (settings.optimize_move_to_prewhere && query.where() && !query.prewhere() && !query.final())
MergeTreeWhereOptimizer{query_info, context, merge_tree.getData(), query_analyzer->getRequiredSourceColumns(), log};
MergeTreeWhereOptimizer{query_info, context, merge_tree, query_analyzer->getRequiredSourceColumns(), log};
};
if (const StorageMergeTree * merge_tree = dynamic_cast<const StorageMergeTree *>(storage.get()))
optimize_prewhere(*merge_tree);
else if (const StorageReplicatedMergeTree * replicated_merge_tree = dynamic_cast<const StorageReplicatedMergeTree *>(storage.get()))
optimize_prewhere(*replicated_merge_tree);
if (const MergeTreeData * merge_tree_data = dynamic_cast<const MergeTreeData *>(storage.get()))
optimize_prewhere(*merge_tree_data);
}
AnalysisResult expressions;

View File

@ -1,8 +1,7 @@
#include <Interpreters/MutationsInterpreter.h>
#include <Interpreters/SyntaxAnalyzer.h>
#include <Interpreters/InterpreterSelectQuery.h>
#include <Storages/StorageMergeTree.h>
#include <Storages/StorageReplicatedMergeTree.h>
#include <Storages/MergeTree/MergeTreeData.h>
#include <DataStreams/FilterBlockInputStream.h>
#include <DataStreams/ExpressionBlockInputStream.h>
#include <DataStreams/CreatingSetsBlockInputStream.h>
@ -86,12 +85,8 @@ bool MutationsInterpreter::isStorageTouchedByMutations() const
static NameSet getKeyColumns(const StoragePtr & storage)
{
const MergeTreeData * merge_tree_data = nullptr;
if (auto merge_tree = dynamic_cast<StorageMergeTree *>(storage.get()))
merge_tree_data = &merge_tree->getData();
else if (auto replicated_merge_tree = dynamic_cast<StorageReplicatedMergeTree *>(storage.get()))
merge_tree_data = &replicated_merge_tree->getData();
else
const MergeTreeData * merge_tree_data = dynamic_cast<const MergeTreeData *>(storage.get());
if (!merge_tree_data)
return {};
NameSet key_columns;

View File

@ -14,7 +14,7 @@ Block MergeTreeBlockOutputStream::getHeader() const
void MergeTreeBlockOutputStream::write(const Block & block)
{
storage.data.delayInsertOrThrowIfNeeded();
storage.delayInsertOrThrowIfNeeded();
auto part_blocks = storage.writer.splitBlockIntoParts(block, max_parts_per_block);
for (auto & current_block : part_blocks)
@ -22,7 +22,7 @@ void MergeTreeBlockOutputStream::write(const Block & block)
Stopwatch watch;
MergeTreeData::MutableDataPartPtr part = storage.writer.writeTempPart(current_block);
storage.data.renameTempPartAndAdd(part, &storage.increment);
storage.renameTempPartAndAdd(part, &storage.increment);
PartLog::addNewPart(storage.global_context, part, watch.elapsed());

View File

@ -2665,7 +2665,7 @@ bool MergeTreeData::isPrimaryOrMinMaxKeyColumnPossiblyWrappedInFunctions(const A
return false;
}
bool MergeTreeData::mayBenefitFromIndexForIn(const ASTPtr & left_in_operand) const
bool MergeTreeData::mayBenefitFromIndexForIn(const ASTPtr & left_in_operand, const Context &) const
{
/// Make sure that the left side of the IN operator contain part of the key.
/// If there is a tuple on the left side of the IN operator, at least one item of the tuple
@ -2694,18 +2694,12 @@ bool MergeTreeData::mayBenefitFromIndexForIn(const ASTPtr & left_in_operand) con
}
}
MergeTreeData * MergeTreeData::checkStructureAndGetMergeTreeData(const StoragePtr & source_table) const
MergeTreeData & MergeTreeData::checkStructureAndGetMergeTreeData(const StoragePtr & source_table) const
{
MergeTreeData * src_data;
if (auto storage_merge_tree = dynamic_cast<StorageMergeTree *>(source_table.get()))
src_data = &storage_merge_tree->data;
else if (auto storage_replicated_merge_tree = dynamic_cast<StorageReplicatedMergeTree *>(source_table.get()))
src_data = &storage_replicated_merge_tree->data;
else
{
throw Exception("Table " + table_name + " supports attachPartitionFrom only for MergeTree or ReplicatedMergeTree engines."
MergeTreeData * src_data = dynamic_cast<MergeTreeData *>(source_table.get());
if (!src_data)
throw Exception("Table " + table_name + " supports attachPartitionFrom only for MergeTree family of table engines."
" Got " + source_table->getName(), ErrorCodes::NOT_IMPLEMENTED);
}
if (getColumns().getAllPhysical().sizeOfDifference(src_data->getColumns().getAllPhysical()))
throw Exception("Tables have different structure", ErrorCodes::INCOMPATIBLE_COLUMNS);
@ -2724,7 +2718,7 @@ MergeTreeData * MergeTreeData::checkStructureAndGetMergeTreeData(const StoragePt
if (format_version != src_data->format_version)
throw Exception("Tables have different format_version", ErrorCodes::BAD_ARGUMENTS);
return src_data;
return *src_data;
}
MergeTreeData::MutableDataPartPtr MergeTreeData::cloneAndLoadDataPart(const MergeTreeData::DataPartPtr & src_part,

View File

@ -3,10 +3,11 @@
#include <Common/SimpleIncrement.h>
#include <Interpreters/Context.h>
#include <Interpreters/ExpressionActions.h>
#include <Storages/ITableDeclaration.h>
#include <Storages/IStorage.h>
#include <Storages/MergeTree/MergeTreeIndices.h>
#include <Storages/MergeTree/MergeTreePartInfo.h>
#include <Storages/MergeTree/MergeTreeSettings.h>
#include <Storages/MergeTree/MergeTreeMutationStatus.h>
#include <IO/ReadBufferFromString.h>
#include <IO/WriteBufferFromFile.h>
#include <IO/ReadBufferFromFile.h>
@ -89,7 +90,7 @@ namespace ErrorCodes
/// - MergeTreeDataWriter
/// - MergeTreeDataMergerMutator
class MergeTreeData : public ITableDeclaration
class MergeTreeData : public IStorage
{
public:
/// Function to call if the part is suspected to contain corrupt data.
@ -347,9 +348,9 @@ public:
/// Load the set of data parts from disk. Call once - immediately after the object is created.
void loadDataParts(bool skip_sanity_checks);
bool supportsPrewhere() const { return true; }
bool supportsPrewhere() const override { return true; }
bool supportsFinal() const
bool supportsFinal() const override
{
return merging_params.mode == MergingParams::Collapsing
|| merging_params.mode == MergingParams::Summing
@ -358,7 +359,7 @@ public:
|| merging_params.mode == MergingParams::VersionedCollapsing;
}
bool mayBenefitFromIndexForIn(const ASTPtr & left_in_operand) const;
bool mayBenefitFromIndexForIn(const ASTPtr & left_in_operand, const Context &) const override;
Int64 getMaxBlockNumber();
@ -385,12 +386,10 @@ public:
|| column_name == "_sample_factor";
}
String getDatabaseName() const { return database_name; }
String getTableName() const { return table_name; }
String getDatabaseName() const override { return database_name; }
String getTableName() const override { return table_name; }
String getFullPath() const { return full_path; }
String getLogName() const { return log_name; }
/// Returns a copy of the list so that the caller shouldn't worry about locks.
@ -539,22 +538,23 @@ public:
*/
static ASTPtr extractKeyExpressionList(const ASTPtr & node);
Names getColumnsRequiredForPartitionKey() const { return (partition_key_expr ? partition_key_expr->getRequiredColumns() : Names{}); }
Names getColumnsRequiredForPartitionKey() const override { return (partition_key_expr ? partition_key_expr->getRequiredColumns() : Names{}); }
bool hasSortingKey() const { return !sorting_key_columns.empty(); }
bool hasPrimaryKey() const { return !primary_key_columns.empty(); }
bool hasSkipIndices() const { return !skip_indices.empty(); }
bool hasTableTTL() const { return ttl_table_ast != nullptr; }
ASTPtr getSortingKeyAST() const { return sorting_key_expr_ast; }
ASTPtr getPrimaryKeyAST() const { return primary_key_expr_ast; }
ASTPtr getPartitionKeyAST() const override { return partition_by_ast; }
ASTPtr getSortingKeyAST() const override { return sorting_key_expr_ast; }
ASTPtr getPrimaryKeyAST() const override { return primary_key_expr_ast; }
Names getColumnsRequiredForSortingKey() const { return sorting_key_expr->getRequiredColumns(); }
Names getColumnsRequiredForPrimaryKey() const { return primary_key_expr->getRequiredColumns(); }
Names getColumnsRequiredForSortingKey() const override { return sorting_key_expr->getRequiredColumns(); }
Names getColumnsRequiredForPrimaryKey() const override { return primary_key_expr->getRequiredColumns(); }
bool supportsSampling() const { return sample_by_ast != nullptr; }
bool supportsSampling() const override { return sample_by_ast != nullptr; }
ASTPtr getSamplingExpression() const { return sample_by_ast; }
Names getColumnsRequiredForSampling() const { return columns_required_for_sampling; }
Names getColumnsRequiredForSampling() const override { return columns_required_for_sampling; }
/// Check that the part is not broken and calculate the checksums for it if they are not present.
MutableDataPartPtr loadPartAndFixMetadata(const String & relative_path);
@ -592,11 +592,13 @@ public:
/// Extracts MergeTreeData of other *MergeTree* storage
/// and checks that their structure suitable for ALTER TABLE ATTACH PARTITION FROM
/// Tables structure should be locked.
MergeTreeData * checkStructureAndGetMergeTreeData(const StoragePtr & source_table) const;
MergeTreeData & checkStructureAndGetMergeTreeData(const StoragePtr & source_table) const;
MergeTreeData::MutableDataPartPtr cloneAndLoadDataPart(const MergeTreeData::DataPartPtr & src_part, const String & tmp_part_prefix,
const MergeTreePartInfo & dst_part_info);
virtual std::vector<MergeTreeMutationStatus> getMutationsStatus() const = 0;
MergeTreeDataFormatVersion format_version;
Context global_context;
@ -655,13 +657,12 @@ public:
/// For generating names of temporary parts during insertion.
SimpleIncrement insert_increment;
private:
protected:
friend struct MergeTreeDataPart;
friend class StorageMergeTree;
friend class StorageReplicatedMergeTree;
friend class MergeTreeDataMergerMutator;
friend class ReplicatedMergeTreeAlterThread;
friend struct ReplicatedMergeTreeTableMetadata;
friend class StorageReplicatedMergeTree;
ASTPtr partition_by_ast;
ASTPtr order_by_ast;

View File

@ -36,7 +36,7 @@ void ReplicatedMergeTreeAlterThread::run()
try
{
/** We have a description of columns in ZooKeeper, common for all replicas (Example: /clickhouse/tables/02-06/visits/columns),
* as well as a description of columns in local file with metadata (storage.data.getColumnsList()).
* as well as a description of columns in local file with metadata (storage.getColumnsList()).
*
* If these descriptions are different - you need to do ALTER.
*
@ -83,7 +83,7 @@ void ReplicatedMergeTreeAlterThread::run()
const String & metadata_str = metadata_znode.contents;
auto metadata_in_zk = ReplicatedMergeTreeTableMetadata::parse(metadata_str);
auto metadata_diff = ReplicatedMergeTreeTableMetadata(storage.data).checkAndFindDiff(metadata_in_zk, /* allow_alter = */ true);
auto metadata_diff = ReplicatedMergeTreeTableMetadata(storage).checkAndFindDiff(metadata_in_zk, /* allow_alter = */ true);
/// If you need to lock table structure, then suspend merges.
ActionLock merge_blocker = storage.merger_mutator.actions_blocker.cancel();
@ -123,7 +123,7 @@ void ReplicatedMergeTreeAlterThread::run()
}
/// You need to get a list of parts under table lock to avoid race condition with merge.
parts = storage.data.getDataParts();
parts = storage.getDataParts();
storage.columns_version = columns_version;
storage.metadata_version = metadata_version;
@ -140,7 +140,7 @@ void ReplicatedMergeTreeAlterThread::run()
int changed_parts = 0;
if (!changed_columns_version)
parts = storage.data.getDataParts();
parts = storage.getDataParts();
const auto columns_for_parts = storage.getColumns().getAllPhysical();
const auto indices_for_parts = storage.getIndices();
@ -150,7 +150,7 @@ void ReplicatedMergeTreeAlterThread::run()
/// Update the part and write result to temporary files.
/// TODO: You can skip checking for too large changes if ZooKeeper has, for example,
/// node /flags/force_alter.
auto transaction = storage.data.alterDataPart(part, columns_for_parts, indices_for_parts.indices, false);
auto transaction = storage.alterDataPart(part, columns_for_parts, indices_for_parts.indices, false);
if (!transaction)
continue;
@ -160,7 +160,7 @@ void ReplicatedMergeTreeAlterThread::run()
}
/// Columns sizes could be quietly changed in case of MODIFY/ADD COLUMN
storage.data.recalculateColumnSizes();
storage.recalculateColumnSizes();
if (changed_columns_version)
{

View File

@ -35,7 +35,7 @@ namespace ErrorCodes
ReplicatedMergeTreeBlockOutputStream::ReplicatedMergeTreeBlockOutputStream(
StorageReplicatedMergeTree & storage_, size_t quorum_, size_t quorum_timeout_ms_, size_t max_parts_per_block, bool deduplicate_)
: storage(storage_), quorum(quorum_), quorum_timeout_ms(quorum_timeout_ms_), max_parts_per_block(max_parts_per_block), deduplicate(deduplicate_),
log(&Logger::get(storage.data.getLogName() + " (Replicated OutputStream)"))
log(&Logger::get(storage.getLogName() + " (Replicated OutputStream)"))
{
/// The quorum value `1` has the same meaning as if it is disabled.
if (quorum == 1)
@ -109,7 +109,7 @@ void ReplicatedMergeTreeBlockOutputStream::write(const Block & block)
last_block_is_duplicate = false;
/// TODO Is it possible to not lock the table structure here?
storage.data.delayInsertOrThrowIfNeeded(&storage.partial_shutdown_event);
storage.delayInsertOrThrowIfNeeded(&storage.partial_shutdown_event);
auto zookeeper = storage.getZooKeeper();
assertSessionIsNotExpired(zookeeper);
@ -297,8 +297,8 @@ void ReplicatedMergeTreeBlockOutputStream::commitPart(zkutil::ZooKeeperPtr & zoo
quorum_info.host_node_version));
}
MergeTreeData::Transaction transaction(storage.data); /// If you can not add a part to ZK, we'll remove it back from the working set.
storage.data.renameTempPartAndAdd(part, nullptr, &transaction);
MergeTreeData::Transaction transaction(storage); /// If you can not add a part to ZK, we'll remove it back from the working set.
storage.renameTempPartAndAdd(part, nullptr, &transaction);
Coordination::Responses responses;
int32_t multi_code = zookeeper->tryMultiNoThrow(ops, responses); /// 1 RTT
@ -414,7 +414,7 @@ void ReplicatedMergeTreeBlockOutputStream::commitPart(zkutil::ZooKeeperPtr & zoo
void ReplicatedMergeTreeBlockOutputStream::writePrefix()
{
storage.data.throwInsertIfNeeded();
storage.throwInsertIfNeeded();
}

View File

@ -27,8 +27,8 @@ ReplicatedMergeTreeCleanupThread::ReplicatedMergeTreeCleanupThread(StorageReplic
void ReplicatedMergeTreeCleanupThread::run()
{
const auto CLEANUP_SLEEP_MS = storage.data.settings.cleanup_delay_period * 1000
+ std::uniform_int_distribution<UInt64>(0, storage.data.settings.cleanup_delay_period_random_add * 1000)(rng);
const auto CLEANUP_SLEEP_MS = storage.settings.cleanup_delay_period * 1000
+ std::uniform_int_distribution<UInt64>(0, storage.settings.cleanup_delay_period_random_add * 1000)(rng);
try
{
@ -57,7 +57,7 @@ void ReplicatedMergeTreeCleanupThread::iterate()
{
/// TODO: Implement tryLockStructureForShare.
auto lock = storage.lockStructureForShare(false, "");
storage.data.clearOldTemporaryDirectories();
storage.clearOldTemporaryDirectories();
}
/// This is loose condition: no problem if we actually had lost leadership at this moment
@ -82,7 +82,7 @@ void ReplicatedMergeTreeCleanupThread::clearOldLogs()
int children_count = stat.numChildren;
/// We will wait for 1.1 times more records to accumulate than necessary.
if (static_cast<double>(children_count) < storage.data.settings.min_replicated_logs_to_keep * 1.1)
if (static_cast<double>(children_count) < storage.settings.min_replicated_logs_to_keep * 1.1)
return;
Strings replicas = zookeeper->getChildren(storage.zookeeper_path + "/replicas", &stat);
@ -100,8 +100,8 @@ void ReplicatedMergeTreeCleanupThread::clearOldLogs()
std::sort(entries.begin(), entries.end());
String min_saved_record_log_str = entries[
entries.size() > storage.data.settings.max_replicated_logs_to_keep.value
? entries.size() - storage.data.settings.max_replicated_logs_to_keep.value
entries.size() > storage.settings.max_replicated_logs_to_keep.value
? entries.size() - storage.settings.max_replicated_logs_to_keep.value
: 0];
/// Replicas that were marked is_lost but are active.
@ -203,7 +203,7 @@ void ReplicatedMergeTreeCleanupThread::clearOldLogs()
min_saved_log_pointer = std::min(min_saved_log_pointer, min_log_pointer_lost_candidate);
/// We will not touch the last `min_replicated_logs_to_keep` records.
entries.erase(entries.end() - std::min<UInt64>(entries.size(), storage.data.settings.min_replicated_logs_to_keep.value), entries.end());
entries.erase(entries.end() - std::min<UInt64>(entries.size(), storage.settings.min_replicated_logs_to_keep.value), entries.end());
/// We will not touch records that are no less than `min_saved_log_pointer`.
entries.erase(std::lower_bound(entries.begin(), entries.end(), "log-" + padIndex(min_saved_log_pointer)), entries.end());
@ -294,12 +294,12 @@ void ReplicatedMergeTreeCleanupThread::clearOldBlocks()
/// Use ZooKeeper's first node (last according to time) timestamp as "current" time.
Int64 current_time = timed_blocks.front().ctime;
Int64 time_threshold = std::max(static_cast<Int64>(0), current_time - static_cast<Int64>(1000 * storage.data.settings.replicated_deduplication_window_seconds));
Int64 time_threshold = std::max(static_cast<Int64>(0), current_time - static_cast<Int64>(1000 * storage.settings.replicated_deduplication_window_seconds));
/// Virtual node, all nodes that are "greater" than this one will be deleted
NodeWithStat block_threshold{{}, time_threshold};
size_t current_deduplication_window = std::min<size_t>(timed_blocks.size(), storage.data.settings.replicated_deduplication_window.value);
size_t current_deduplication_window = std::min<size_t>(timed_blocks.size(), storage.settings.replicated_deduplication_window.value);
auto first_outdated_block_fixed_threshold = timed_blocks.begin() + current_deduplication_window;
auto first_outdated_block_time_threshold = std::upper_bound(timed_blocks.begin(), timed_blocks.end(), block_threshold, NodeWithStat::greaterByTime);
auto first_outdated_block = std::min(first_outdated_block_fixed_threshold, first_outdated_block_time_threshold);
@ -392,10 +392,10 @@ void ReplicatedMergeTreeCleanupThread::getBlocksSortedByTime(zkutil::ZooKeeper &
void ReplicatedMergeTreeCleanupThread::clearOldMutations()
{
if (!storage.data.settings.finished_mutations_to_keep)
if (!storage.settings.finished_mutations_to_keep)
return;
if (storage.queue.countFinishedMutations() <= storage.data.settings.finished_mutations_to_keep)
if (storage.queue.countFinishedMutations() <= storage.settings.finished_mutations_to_keep)
{
/// Not strictly necessary, but helps to avoid unnecessary ZooKeeper requests.
/// If even this replica hasn't finished enough mutations yet, then we don't need to clean anything.
@ -422,10 +422,10 @@ void ReplicatedMergeTreeCleanupThread::clearOldMutations()
/// Do not remove entries that are greater than `min_pointer` (they are not done yet).
entries.erase(std::upper_bound(entries.begin(), entries.end(), padIndex(min_pointer)), entries.end());
/// Do not remove last `storage.data.settings.finished_mutations_to_keep` entries.
if (entries.size() <= storage.data.settings.finished_mutations_to_keep)
/// Do not remove last `storage.settings.finished_mutations_to_keep` entries.
if (entries.size() <= storage.settings.finished_mutations_to_keep)
return;
entries.erase(entries.end() - storage.data.settings.finished_mutations_to_keep, entries.end());
entries.erase(entries.end() - storage.settings.finished_mutations_to_keep, entries.end());
if (entries.empty())
return;

View File

@ -90,7 +90,7 @@ void ReplicatedMergeTreePartCheckThread::searchForMissingPart(const String & par
}
/// If the part is not in ZooKeeper, we'll check if it's at least somewhere.
auto part_info = MergeTreePartInfo::fromPartName(part_name, storage.data.format_version);
auto part_info = MergeTreePartInfo::fromPartName(part_name, storage.format_version);
/** The logic is as follows:
* - if some live or inactive replica has such a part, or a part covering it
@ -126,7 +126,7 @@ void ReplicatedMergeTreePartCheckThread::searchForMissingPart(const String & par
Strings parts = zookeeper->getChildren(storage.zookeeper_path + "/replicas/" + replica + "/parts");
for (const String & part_on_replica : parts)
{
auto part_on_replica_info = MergeTreePartInfo::fromPartName(part_on_replica, storage.data.format_version);
auto part_on_replica_info = MergeTreePartInfo::fromPartName(part_on_replica, storage.format_version);
if (part_on_replica_info.contains(part_info))
{
@ -189,9 +189,9 @@ void ReplicatedMergeTreePartCheckThread::checkPart(const String & part_name)
/// If the part is still in the PreCommitted -> Committed transition, it is not lost
/// and there is no need to go searching for it on other replicas. To definitely find the needed part
/// if it exists (or a part containing it) we first search among the PreCommitted parts.
auto part = storage.data.getPartIfExists(part_name, {MergeTreeDataPartState::PreCommitted});
auto part = storage.getPartIfExists(part_name, {MergeTreeDataPartState::PreCommitted});
if (!part)
part = storage.data.getActiveContainingPart(part_name);
part = storage.getActiveContainingPart(part_name);
/// We do not have this or a covering part.
if (!part)
@ -235,8 +235,8 @@ void ReplicatedMergeTreePartCheckThread::checkPart(const String & part_name)
checkDataPart(
part,
true,
storage.data.primary_key_data_types,
storage.data.skip_indices,
storage.primary_key_data_types,
storage.skip_indices,
[this] { return need_stop.load(); });
if (need_stop)
@ -259,7 +259,7 @@ void ReplicatedMergeTreePartCheckThread::checkPart(const String & part_name)
storage.removePartAndEnqueueFetch(part_name);
/// Delete part locally.
storage.data.forgetPartAndMoveToDetached(part, "broken_");
storage.forgetPartAndMoveToDetached(part, "broken_");
}
}
else if (part->modification_time + MAX_AGE_OF_LOCAL_PART_THAT_WASNT_ADDED_TO_ZOOKEEPER < time(nullptr))
@ -270,7 +270,7 @@ void ReplicatedMergeTreePartCheckThread::checkPart(const String & part_name)
ProfileEvents::increment(ProfileEvents::ReplicatedPartChecksFailed);
LOG_ERROR(log, "Unexpected part " << part_name << " in filesystem. Removing.");
storage.data.forgetPartAndMoveToDetached(part, "unexpected_");
storage.forgetPartAndMoveToDetached(part, "unexpected_");
}
else
{

View File

@ -20,7 +20,7 @@ namespace ErrorCodes
ReplicatedMergeTreeQueue::ReplicatedMergeTreeQueue(StorageReplicatedMergeTree & storage_)
: storage(storage_)
, format_version(storage.data.format_version)
, format_version(storage.format_version)
, current_parts(format_version)
, virtual_parts(format_version)
{}
@ -62,14 +62,14 @@ bool ReplicatedMergeTreeQueue::load(zkutil::ZooKeeperPtr zookeeper)
Strings children = zookeeper->getChildren(queue_path);
auto to_remove_it = std::remove_if(
children.begin(), children.end(), [&](const String & path)
{
return already_loaded_paths.count(path);
});
children.begin(), children.end(), [&](const String & path)
{
return already_loaded_paths.count(path);
});
LOG_DEBUG(log,
"Having " << (to_remove_it - children.begin()) << " queue entries to load, "
<< (children.end() - to_remove_it) << " entries already loaded.");
"Having " << (to_remove_it - children.begin()) << " queue entries to load, "
<< (children.end() - to_remove_it) << " entries already loaded.");
children.erase(to_remove_it, children.end());
std::sort(children.begin(), children.end());

View File

@ -44,11 +44,11 @@ ReplicatedMergeTreeRestartingThread::ReplicatedMergeTreeRestartingThread(Storage
, log(&Logger::get(log_name))
, active_node_identifier(generateActiveNodeIdentifier())
{
check_period_ms = storage.data.settings.zookeeper_session_expiration_check_period.totalSeconds() * 1000;
check_period_ms = storage.settings.zookeeper_session_expiration_check_period.totalSeconds() * 1000;
/// Periodicity of checking lag of replica.
if (check_period_ms > static_cast<Int64>(storage.data.settings.check_delay_period) * 1000)
check_period_ms = storage.data.settings.check_delay_period * 1000;
if (check_period_ms > static_cast<Int64>(storage.settings.check_delay_period) * 1000)
check_period_ms = storage.settings.check_delay_period * 1000;
task = storage.global_context.getSchedulePool().createTask(log_name, [this]{ run(); });
}
@ -121,7 +121,7 @@ void ReplicatedMergeTreeRestartingThread::run()
}
time_t current_time = time(nullptr);
if (current_time >= prev_time_of_check_delay + static_cast<time_t>(storage.data.settings.check_delay_period))
if (current_time >= prev_time_of_check_delay + static_cast<time_t>(storage.settings.check_delay_period))
{
/// Find out lag of replicas.
time_t absolute_delay = 0;
@ -136,10 +136,10 @@ void ReplicatedMergeTreeRestartingThread::run()
/// We give up leadership if the relative lag is greater than threshold.
if (storage.is_leader
&& relative_delay > static_cast<time_t>(storage.data.settings.min_relative_delay_to_yield_leadership))
&& relative_delay > static_cast<time_t>(storage.settings.min_relative_delay_to_yield_leadership))
{
LOG_INFO(log, "Relative replica delay (" << relative_delay << " seconds) is bigger than threshold ("
<< storage.data.settings.min_relative_delay_to_yield_leadership << "). Will yield leadership.");
<< storage.settings.min_relative_delay_to_yield_leadership << "). Will yield leadership.");
ProfileEvents::increment(ProfileEvents::ReplicaYieldLeadership);
@ -181,7 +181,7 @@ bool ReplicatedMergeTreeRestartingThread::tryStartup()
updateQuorumIfWeHavePart();
if (storage.data.settings.replicated_can_become_leader)
if (storage.settings.replicated_can_become_leader)
storage.enterLeaderElection();
else
LOG_INFO(log, "Will not enter leader election because replicated_can_become_leader=0");
@ -239,13 +239,13 @@ void ReplicatedMergeTreeRestartingThread::removeFailedQuorumParts()
for (auto part_name : failed_parts)
{
auto part = storage.data.getPartIfExists(
auto part = storage.getPartIfExists(
part_name, {MergeTreeDataPartState::PreCommitted, MergeTreeDataPartState::Committed, MergeTreeDataPartState::Outdated});
if (part)
{
LOG_DEBUG(log, "Found part " << part_name << " with failed quorum. Moving to detached. This shouldn't happen often.");
storage.data.forgetPartAndMoveToDetached(part, "noquorum_");
storage.forgetPartAndMoveToDetached(part, "noquorum_");
storage.queue.removeFromVirtualParts(part->info);
}
}

View File

@ -32,9 +32,9 @@ public:
bool supportsIndexForIn() const override { return true; }
bool mayBenefitFromIndexForIn(const ASTPtr & left_in_operand, const Context & /* query_context */) const override
bool mayBenefitFromIndexForIn(const ASTPtr & left_in_operand, const Context & query_context) const override
{
return part->storage.mayBenefitFromIndexForIn(left_in_operand);
return part->storage.mayBenefitFromIndexForIn(left_in_operand, query_context);
}
protected:

View File

@ -60,28 +60,28 @@ StorageMergeTree::StorageMergeTree(
const ASTPtr & primary_key_ast_,
const ASTPtr & sample_by_ast_, /// nullptr, if sampling is not supported.
const ASTPtr & ttl_table_ast_,
const MergeTreeData::MergingParams & merging_params_,
const MergingParams & merging_params_,
const MergeTreeSettings & settings_,
bool has_force_restore_data_flag)
: path(path_), database_name(database_name_), table_name(table_name_), full_path(path + escapeForFileName(table_name) + '/'),
global_context(context_), background_pool(context_.getBackgroundPool()),
data(database_name, table_name,
full_path, columns_, indices_,
context_, date_column_name, partition_by_ast_, order_by_ast_, primary_key_ast_,
sample_by_ast_, ttl_table_ast_, merging_params_,
settings_, false, attach),
reader(data), writer(data), merger_mutator(data, global_context.getBackgroundPool()),
log(&Logger::get(database_name_ + "." + table_name + " (StorageMergeTree)"))
: MergeTreeData(database_name_, table_name_,
path_ + escapeForFileName(table_name_) + '/',
columns_, indices_,
context_, date_column_name, partition_by_ast_, order_by_ast_, primary_key_ast_,
sample_by_ast_, ttl_table_ast_, merging_params_,
settings_, false, attach),
path(path_),
background_pool(context_.getBackgroundPool()),
reader(*this), writer(*this), merger_mutator(*this, global_context.getBackgroundPool())
{
if (path_.empty())
throw Exception("MergeTree storages require data path", ErrorCodes::INCORRECT_FILE_NAME);
if (path.empty())
throw Exception("MergeTree require data path", ErrorCodes::INCORRECT_FILE_NAME);
data.loadDataParts(has_force_restore_data_flag);
loadDataParts(has_force_restore_data_flag);
if (!attach && !data.getDataParts().empty())
if (!attach && !getDataParts().empty())
throw Exception("Data directory for table already containing data parts - probably it was unclean DROP table or manual intervention. You must either clear directory by hand or use ATTACH TABLE instead of CREATE TABLE if you need to use that parts.", ErrorCodes::INCORRECT_DATA);
increment.set(data.getMaxBlockNumber());
increment.set(getMaxBlockNumber());
loadMutations();
}
@ -89,11 +89,11 @@ StorageMergeTree::StorageMergeTree(
void StorageMergeTree::startup()
{
data.clearOldPartsFromFilesystem();
clearOldPartsFromFilesystem();
/// Temporary directories contain incomplete results of merges (after forced restart)
/// and don't allow to reinitialize them, so delete each of them immediately
data.clearOldTemporaryDirectories(0);
clearOldTemporaryDirectories(0);
/// NOTE background task will also do the above cleanups periodically.
time_after_previous_cleanup.restart();
@ -135,16 +135,16 @@ BlockOutputStreamPtr StorageMergeTree::write(const ASTPtr & /*query*/, const Con
void StorageMergeTree::checkTableCanBeDropped() const
{
const_cast<MergeTreeData &>(getData()).recalculateColumnSizes();
global_context.checkTableCanBeDropped(database_name, table_name, getData().getTotalActiveSizeInBytes());
const_cast<StorageMergeTree &>(*this).recalculateColumnSizes();
global_context.checkTableCanBeDropped(database_name, table_name, getTotalActiveSizeInBytes());
}
void StorageMergeTree::checkPartitionCanBeDropped(const ASTPtr & partition)
{
const_cast<MergeTreeData &>(getData()).recalculateColumnSizes();
const_cast<StorageMergeTree &>(*this).recalculateColumnSizes();
const String partition_id = data.getPartitionIDFromQuery(partition, global_context);
auto parts_to_remove = data.getDataPartsVectorInPartition(MergeTreeDataPartState::Committed, partition_id);
const String partition_id = getPartitionIDFromQuery(partition, global_context);
auto parts_to_remove = getDataPartsVectorInPartition(MergeTreeDataPartState::Committed, partition_id);
UInt64 partition_size = 0;
@ -158,7 +158,7 @@ void StorageMergeTree::checkPartitionCanBeDropped(const ASTPtr & partition)
void StorageMergeTree::drop()
{
shutdown();
data.dropAllData();
dropAllData();
}
void StorageMergeTree::truncate(const ASTPtr &, const Context &)
@ -170,20 +170,20 @@ void StorageMergeTree::truncate(const ASTPtr &, const Context &)
/// NOTE: It's assumed that this method is called under lockForAlter.
auto parts_to_remove = data.getDataPartsVector();
data.removePartsFromWorkingSet(parts_to_remove, true);
auto parts_to_remove = getDataPartsVector();
removePartsFromWorkingSet(parts_to_remove, true);
LOG_INFO(log, "Removed " << parts_to_remove.size() << " parts.");
}
data.clearOldPartsFromFilesystem();
clearOldPartsFromFilesystem();
}
void StorageMergeTree::rename(const String & new_path_to_db, const String & /*new_database_name*/, const String & new_table_name)
{
std::string new_full_path = new_path_to_db + escapeForFileName(new_table_name) + '/';
data.setPath(new_full_path);
setPath(new_full_path);
path = new_path_to_db;
table_name = new_table_name;
@ -216,21 +216,21 @@ void StorageMergeTree::alter(
lockNewDataStructureExclusively(table_lock_holder, context.getCurrentQueryId());
data.checkAlter(params, context);
checkAlter(params, context);
auto new_columns = data.getColumns();
auto new_indices = data.getIndices();
ASTPtr new_order_by_ast = data.order_by_ast;
ASTPtr new_primary_key_ast = data.primary_key_ast;
ASTPtr new_ttl_table_ast = data.ttl_table_ast;
auto new_columns = getColumns();
auto new_indices = getIndices();
ASTPtr new_order_by_ast = order_by_ast;
ASTPtr new_primary_key_ast = primary_key_ast;
ASTPtr new_ttl_table_ast = ttl_table_ast;
params.apply(new_columns, new_indices, new_order_by_ast, new_primary_key_ast, new_ttl_table_ast);
auto parts = data.getDataParts({MergeTreeDataPartState::PreCommitted, MergeTreeDataPartState::Committed, MergeTreeDataPartState::Outdated});
auto parts = getDataParts({MergeTreeDataPartState::PreCommitted, MergeTreeDataPartState::Committed, MergeTreeDataPartState::Outdated});
auto columns_for_parts = new_columns.getAllPhysical();
std::vector<MergeTreeData::AlterDataPartTransactionPtr> transactions;
for (const MergeTreeData::DataPartPtr & part : parts)
std::vector<AlterDataPartTransactionPtr> transactions;
for (const DataPartPtr & part : parts)
{
if (auto transaction = data.alterDataPart(part, columns_for_parts, new_indices.indices, false))
if (auto transaction = alterDataPart(part, columns_for_parts, new_indices.indices, false))
transactions.push_back(std::move(transaction));
}
@ -240,28 +240,28 @@ void StorageMergeTree::alter(
{
auto & storage_ast = ast.as<ASTStorage &>();
if (new_order_by_ast.get() != data.order_by_ast.get())
if (new_order_by_ast.get() != order_by_ast.get())
storage_ast.set(storage_ast.order_by, new_order_by_ast);
if (new_primary_key_ast.get() != data.primary_key_ast.get())
if (new_primary_key_ast.get() != primary_key_ast.get())
storage_ast.set(storage_ast.primary_key, new_primary_key_ast);
if (new_ttl_table_ast.get() != data.ttl_table_ast.get())
if (new_ttl_table_ast.get() != ttl_table_ast.get())
storage_ast.set(storage_ast.ttl_table, new_ttl_table_ast);
};
context.getDatabase(current_database_name)->alterTable(context, current_table_name, new_columns, new_indices, storage_modifier);
/// Reinitialize primary key because primary key column types might have changed.
data.setPrimaryKeyIndicesAndColumns(new_order_by_ast, new_primary_key_ast, new_columns, new_indices);
setPrimaryKeyIndicesAndColumns(new_order_by_ast, new_primary_key_ast, new_columns, new_indices);
data.setTTLExpressions(new_columns.getColumnTTLs(), new_ttl_table_ast);
setTTLExpressions(new_columns.getColumnTTLs(), new_ttl_table_ast);
for (auto & transaction : transactions)
transaction->commit();
/// Columns sizes could be changed
data.recalculateColumnSizes();
recalculateColumnSizes();
}
@ -341,7 +341,7 @@ public:
void StorageMergeTree::mutate(const MutationCommands & commands, const Context &)
{
MergeTreeMutationEntry entry(commands, full_path, data.insert_increment.get());
MergeTreeMutationEntry entry(commands, full_path, insert_increment.get());
String file_name;
{
std::lock_guard lock(currently_merging_mutex);
@ -362,7 +362,7 @@ std::vector<MergeTreeMutationStatus> StorageMergeTree::getMutationsStatus() cons
std::lock_guard lock(currently_merging_mutex);
std::vector<Int64> part_data_versions;
auto data_parts = data.getDataPartsVector();
auto data_parts = getDataPartsVector();
part_data_versions.reserve(data_parts.size());
for (const auto & part : data_parts)
part_data_versions.push_back(part->info.getDataVersion());
@ -471,7 +471,7 @@ bool StorageMergeTree::merge(
{
std::lock_guard lock(currently_merging_mutex);
auto can_merge = [this, &lock] (const MergeTreeData::DataPartPtr & left, const MergeTreeData::DataPartPtr & right, String *)
auto can_merge = [this, &lock] (const DataPartPtr & left, const DataPartPtr & right, String *)
{
return !currently_merging.count(left) && !currently_merging.count(right)
&& getCurrentMutationVersion(left, lock) == getCurrentMutationVersion(right, lock);
@ -503,7 +503,7 @@ bool StorageMergeTree::merge(
/// Logging
Stopwatch stopwatch;
MergeTreeData::MutableDataPartPtr new_part;
MutableDataPartPtr new_part;
auto write_part_log = [&] (const ExecutionStatus & execution_status)
{
@ -554,7 +554,7 @@ bool StorageMergeTree::merge(
future_part, *merge_entry, time(nullptr),
merging_tagger->reserved_space.get(), deduplicate);
merger_mutator.renameMergedTemporaryPart(new_part, future_part.parts, nullptr);
data.removeEmptyColumnsFromPart(new_part);
removeEmptyColumnsFromPart(new_part);
merging_tagger->is_successful = true;
write_part_log({});
@ -587,7 +587,7 @@ bool StorageMergeTree::tryMutatePart()
return false;
auto mutations_end_it = current_mutations_by_version.end();
for (const auto & part : data.getDataPartsVector())
for (const auto & part : getDataPartsVector())
{
if (currently_merging.count(part))
continue;
@ -621,7 +621,7 @@ bool StorageMergeTree::tryMutatePart()
MergeList::EntryPtr merge_entry = global_context.getMergeList().insert(database_name, table_name, future_part);
Stopwatch stopwatch;
MergeTreeData::MutableDataPartPtr new_part;
MutableDataPartPtr new_part;
auto write_part_log = [&] (const ExecutionStatus & execution_status)
{
@ -670,7 +670,7 @@ bool StorageMergeTree::tryMutatePart()
try
{
new_part = merger_mutator.mutatePartToTemporaryPart(future_part, commands, *merge_entry, global_context);
data.renameTempPartAndReplace(new_part);
renameTempPartAndReplace(new_part);
tagger->is_successful = true;
write_part_log({});
}
@ -698,11 +698,11 @@ BackgroundProcessingPoolTaskResult StorageMergeTree::backgroundTask()
/// Clear old parts. It is unnecessary to do it more than once a second.
if (auto lock = time_after_previous_cleanup.compareAndRestartDeferred(1))
{
data.clearOldPartsFromFilesystem();
clearOldPartsFromFilesystem();
{
/// TODO: Implement tryLockStructureForShare.
auto lock_structure = lockStructureForShare(false, "");
data.clearOldTemporaryDirectories();
clearOldTemporaryDirectories();
}
clearOldMutations();
}
@ -729,7 +729,7 @@ BackgroundProcessingPoolTaskResult StorageMergeTree::backgroundTask()
}
Int64 StorageMergeTree::getCurrentMutationVersion(
const MergeTreeData::DataPartPtr & part,
const DataPartPtr & part,
std::lock_guard<std::mutex> & /* currently_merging_mutex_lock */) const
{
auto it = current_mutations_by_version.upper_bound(part->info.getDataVersion());
@ -741,28 +741,28 @@ Int64 StorageMergeTree::getCurrentMutationVersion(
void StorageMergeTree::clearOldMutations()
{
if (!data.settings.finished_mutations_to_keep)
if (!settings.finished_mutations_to_keep)
return;
std::vector<MergeTreeMutationEntry> mutations_to_delete;
{
std::lock_guard lock(currently_merging_mutex);
if (current_mutations_by_version.size() <= data.settings.finished_mutations_to_keep)
if (current_mutations_by_version.size() <= settings.finished_mutations_to_keep)
return;
auto begin_it = current_mutations_by_version.begin();
std::optional<Int64> min_version = data.getMinPartDataVersion();
std::optional<Int64> min_version = getMinPartDataVersion();
auto end_it = current_mutations_by_version.end();
if (min_version)
end_it = current_mutations_by_version.upper_bound(*min_version);
size_t done_count = std::distance(begin_it, end_it);
if (done_count <= data.settings.finished_mutations_to_keep)
if (done_count <= settings.finished_mutations_to_keep)
return;
size_t to_delete_count = done_count - data.settings.finished_mutations_to_keep;
size_t to_delete_count = done_count - settings.finished_mutations_to_keep;
auto it = begin_it;
for (size_t i = 0; i < to_delete_count; ++i)
@ -790,10 +790,10 @@ void StorageMergeTree::clearColumnInPartition(const ASTPtr & partition, const Fi
/// We don't change table structure, only data in some parts, parts are locked inside alterDataPart() function
auto lock_read_structure = lockStructureForShare(false, context.getCurrentQueryId());
String partition_id = data.getPartitionIDFromQuery(partition, context);
auto parts = data.getDataPartsVectorInPartition(MergeTreeDataPartState::Committed, partition_id);
String partition_id = getPartitionIDFromQuery(partition, context);
auto parts = getDataPartsVectorInPartition(MergeTreeDataPartState::Committed, partition_id);
std::vector<MergeTreeData::AlterDataPartTransactionPtr> transactions;
std::vector<AlterDataPartTransactionPtr> transactions;
AlterCommand alter_command;
alter_command.type = AlterCommand::DROP_COLUMN;
@ -812,7 +812,7 @@ void StorageMergeTree::clearColumnInPartition(const ASTPtr & partition, const Fi
if (part->info.partition_id != partition_id)
throw Exception("Unexpected partition ID " + part->info.partition_id + ". This is a bug.", ErrorCodes::LOGICAL_ERROR);
if (auto transaction = data.alterDataPart(part, columns_for_parts, new_indices.indices, false))
if (auto transaction = alterDataPart(part, columns_for_parts, new_indices.indices, false))
transactions.push_back(std::move(transaction));
LOG_DEBUG(log, "Removing column " << get<String>(column_name) << " from part " << part->name);
@ -825,7 +825,7 @@ void StorageMergeTree::clearColumnInPartition(const ASTPtr & partition, const Fi
transaction->commit();
/// Recalculate columns size (not only for the modified column)
data.recalculateColumnSizes();
recalculateColumnSizes();
}
@ -835,10 +835,10 @@ bool StorageMergeTree::optimize(
String disable_reason;
if (!partition && final)
{
MergeTreeData::DataPartsVector data_parts = data.getDataPartsVector();
DataPartsVector data_parts = getDataPartsVector();
std::unordered_set<String> partition_ids;
for (const MergeTreeData::DataPartPtr & part : data_parts)
for (const DataPartPtr & part : data_parts)
partition_ids.emplace(part->info.partition_id);
for (const String & partition_id : partition_ids)
@ -855,7 +855,7 @@ bool StorageMergeTree::optimize(
{
String partition_id;
if (partition)
partition_id = data.getPartitionIDFromQuery(partition, context);
partition_id = getPartitionIDFromQuery(partition, context);
if (!merge(true, partition_id, final, deduplicate, &disable_reason))
{
@ -895,7 +895,7 @@ void StorageMergeTree::alterPartition(const ASTPtr & query, const PartitionComma
case PartitionCommand::FREEZE_PARTITION:
{
auto lock = lockStructureForShare(false, context.getCurrentQueryId());
data.freezePartition(command.partition, command.with_name, context);
freezePartition(command.partition, command.with_name, context);
}
break;
@ -906,7 +906,7 @@ void StorageMergeTree::alterPartition(const ASTPtr & query, const PartitionComma
case PartitionCommand::FREEZE_ALL_PARTITIONS:
{
auto lock = lockStructureForShare(false, context.getCurrentQueryId());
data.freezeAll(command.with_name, context);
freezeAll(command.with_name, context);
}
break;
@ -925,11 +925,11 @@ void StorageMergeTree::dropPartition(const ASTPtr & partition, bool detach, cons
/// Waits for completion of merge and does not start new ones.
auto lock = lockExclusively(context.getCurrentQueryId());
String partition_id = data.getPartitionIDFromQuery(partition, context);
String partition_id = getPartitionIDFromQuery(partition, context);
/// TODO: should we include PreComitted parts like in Replicated case?
auto parts_to_remove = data.getDataPartsVectorInPartition(MergeTreeDataPartState::Committed, partition_id);
data.removePartsFromWorkingSet(parts_to_remove, true);
auto parts_to_remove = getDataPartsVectorInPartition(MergeTreeDataPartState::Committed, partition_id);
removePartsFromWorkingSet(parts_to_remove, true);
if (detach)
{
@ -944,7 +944,7 @@ void StorageMergeTree::dropPartition(const ASTPtr & partition, bool detach, cons
LOG_INFO(log, (detach ? "Detached " : "Removed ") << parts_to_remove.size() << " parts inside partition ID " << partition_id << ".");
}
data.clearOldPartsFromFilesystem();
clearOldPartsFromFilesystem();
}
@ -957,7 +957,7 @@ void StorageMergeTree::attachPartition(const ASTPtr & partition, bool attach_par
if (attach_part)
partition_id = partition->as<ASTLiteral &>().value.safeGet<String>();
else
partition_id = data.getPartitionIDFromQuery(partition, context);
partition_id = getPartitionIDFromQuery(partition, context);
String source_dir = "detached/";
@ -970,12 +970,12 @@ void StorageMergeTree::attachPartition(const ASTPtr & partition, bool attach_par
else
{
LOG_DEBUG(log, "Looking for parts for partition " << partition_id << " in " << source_dir);
ActiveDataPartSet active_parts(data.format_version);
ActiveDataPartSet active_parts(format_version);
for (Poco::DirectoryIterator it = Poco::DirectoryIterator(full_path + source_dir); it != Poco::DirectoryIterator(); ++it)
{
const String & name = it.name();
MergeTreePartInfo part_info;
if (!MergeTreePartInfo::tryParsePartName(name, &part_info, data.format_version)
if (!MergeTreePartInfo::tryParsePartName(name, &part_info, format_version)
|| part_info.partition_id != partition_id)
{
continue;
@ -992,10 +992,10 @@ void StorageMergeTree::attachPartition(const ASTPtr & partition, bool attach_par
String source_path = source_dir + source_part_name;
LOG_DEBUG(log, "Checking data");
MergeTreeData::MutableDataPartPtr part = data.loadPartAndFixMetadata(source_path);
MutableDataPartPtr part = loadPartAndFixMetadata(source_path);
LOG_INFO(log, "Attaching part " << source_part_name << " from " << source_path);
data.renameTempPartAndAdd(part, &increment);
renameTempPartAndAdd(part, &increment);
LOG_INFO(log, "Finished attaching part");
}
@ -1010,22 +1010,22 @@ void StorageMergeTree::replacePartitionFrom(const StoragePtr & source_table, con
auto lock2 = source_table->lockStructureForShare(false, context.getCurrentQueryId());
Stopwatch watch;
MergeTreeData * src_data = data.checkStructureAndGetMergeTreeData(source_table);
String partition_id = data.getPartitionIDFromQuery(partition, context);
MergeTreeData & src_data = checkStructureAndGetMergeTreeData(source_table);
String partition_id = getPartitionIDFromQuery(partition, context);
MergeTreeData::DataPartsVector src_parts = src_data->getDataPartsVectorInPartition(MergeTreeDataPartState::Committed, partition_id);
MergeTreeData::MutableDataPartsVector dst_parts;
DataPartsVector src_parts = src_data.getDataPartsVectorInPartition(MergeTreeDataPartState::Committed, partition_id);
MutableDataPartsVector dst_parts;
static const String TMP_PREFIX = "tmp_replace_from_";
for (const MergeTreeData::DataPartPtr & src_part : src_parts)
for (const DataPartPtr & src_part : src_parts)
{
/// This will generate unique name in scope of current server process.
Int64 temp_index = data.insert_increment.get();
Int64 temp_index = insert_increment.get();
MergeTreePartInfo dst_part_info(partition_id, temp_index, temp_index, src_part->info.level);
std::shared_lock<std::shared_mutex> part_lock(src_part->columns_lock);
dst_parts.emplace_back(data.cloneAndLoadDataPart(src_part, TMP_PREFIX, dst_part_info));
dst_parts.emplace_back(cloneAndLoadDataPart(src_part, TMP_PREFIX, dst_part_info));
}
/// ATTACH empty part set
@ -1047,19 +1047,19 @@ void StorageMergeTree::replacePartitionFrom(const StoragePtr & source_table, con
{
/// Here we use the transaction just like RAII since rare errors in renameTempPartAndReplace() are possible
/// and we should be able to rollback already added (Precomitted) parts
MergeTreeData::Transaction transaction(data);
Transaction transaction(*this);
auto data_parts_lock = data.lockParts();
auto data_parts_lock = lockParts();
/// Populate transaction
for (MergeTreeData::MutableDataPartPtr & part : dst_parts)
data.renameTempPartAndReplace(part, &increment, &transaction, data_parts_lock);
for (MutableDataPartPtr & part : dst_parts)
renameTempPartAndReplace(part, &increment, &transaction, data_parts_lock);
transaction.commit(&data_parts_lock);
/// If it is REPLACE (not ATTACH), remove all parts which max_block_number less then min_block_number of the first new block
if (replace)
data.removePartsInRangeFromWorkingSet(drop_range, true, false, data_parts_lock);
removePartsInRangeFromWorkingSet(drop_range, true, false, data_parts_lock);
}
PartLog::addNewParts(global_context, dst_parts, watch.elapsed());

View File

@ -20,34 +20,18 @@ namespace DB
/** See the description of the data structure in MergeTreeData.
*/
class StorageMergeTree : public ext::shared_ptr_helper<StorageMergeTree>, public IStorage
class StorageMergeTree : public ext::shared_ptr_helper<StorageMergeTree>, public MergeTreeData
{
public:
void startup() override;
void shutdown() override;
~StorageMergeTree() override;
std::string getName() const override { return data.merging_params.getModeName() + "MergeTree"; }
std::string getName() const override { return merging_params.getModeName() + "MergeTree"; }
std::string getTableName() const override { return table_name; }
std::string getDatabaseName() const override { return database_name; }
bool supportsSampling() const override { return data.supportsSampling(); }
bool supportsPrewhere() const override { return data.supportsPrewhere(); }
bool supportsFinal() const override { return data.supportsFinal(); }
bool supportsIndexForIn() const override { return true; }
bool mayBenefitFromIndexForIn(const ASTPtr & left_in_operand, const Context & /* query_context */) const override
{
return data.mayBenefitFromIndexForIn(left_in_operand);
}
const ColumnsDescription & getColumns() const override { return data.getColumns(); }
void setColumns(ColumnsDescription columns_) override { return data.setColumns(std::move(columns_)); }
virtual const IndicesDescription & getIndices() const override { return data.getIndices(); }
virtual void setIndices(IndicesDescription indices_) override { data.setIndices(std::move(indices_)); }
NameAndTypePair getColumn(const String & column_name) const override { return data.getColumn(column_name); }
bool hasColumn(const String & column_name) const override { return data.hasColumn(column_name); }
BlockInputStreams read(
const Names & column_names,
@ -66,7 +50,7 @@ public:
void alterPartition(const ASTPtr & query, const PartitionCommands & commands, const Context & context) override;
void mutate(const MutationCommands & commands, const Context & context) override;
std::vector<MergeTreeMutationStatus> getMutationsStatus() const;
std::vector<MergeTreeMutationStatus> getMutationsStatus() const override;
CancellationCode killMutation(const String & mutation_id) override;
void drop() override;
@ -84,32 +68,13 @@ public:
ActionLock getActionLock(StorageActionBlockType action_type) override;
MergeTreeData & getData() { return data; }
const MergeTreeData & getData() const { return data; }
String getDataPath() const override { return full_path; }
ASTPtr getPartitionKeyAST() const override { return data.partition_by_ast; }
ASTPtr getSortingKeyAST() const override { return data.getSortingKeyAST(); }
ASTPtr getPrimaryKeyAST() const override { return data.getPrimaryKeyAST(); }
ASTPtr getSamplingKeyAST() const override { return data.getSamplingExpression(); }
Names getColumnsRequiredForPartitionKey() const override { return data.getColumnsRequiredForPartitionKey(); }
Names getColumnsRequiredForSortingKey() const override { return data.getColumnsRequiredForSortingKey(); }
Names getColumnsRequiredForPrimaryKey() const override { return data.getColumnsRequiredForPrimaryKey(); }
Names getColumnsRequiredForSampling() const override { return data.getColumnsRequiredForSampling(); }
Names getColumnsRequiredForFinal() const override { return data.getColumnsRequiredForSortingKey(); }
private:
String path;
String database_name;
String table_name;
String full_path;
Context global_context;
BackgroundProcessingPool & background_pool;
MergeTreeData data;
MergeTreeDataSelectExecutor reader;
MergeTreeDataWriter writer;
MergeTreeDataMergerMutator merger_mutator;
@ -121,12 +86,10 @@ private:
AtomicStopwatch time_after_previous_cleanup;
mutable std::mutex currently_merging_mutex;
MergeTreeData::DataParts currently_merging;
DataParts currently_merging;
std::map<String, MergeTreeMutationEntry> current_mutations_by_id;
std::multimap<Int64, MergeTreeMutationEntry &> current_mutations_by_version;
Logger * log;
std::atomic<bool> shutdown_called {false};
BackgroundProcessingPool::TaskHandle background_task_handle;
@ -137,8 +100,7 @@ private:
* If aggressive - when selects parts don't takes into account their ratio size and novelty (used for OPTIMIZE query).
* Returns true if merge is finished successfully.
*/
bool merge(bool aggressive, const String & partition_id, bool final, bool deduplicate,
String * out_disable_reason = nullptr);
bool merge(bool aggressive, const String & partition_id, bool final, bool deduplicate, String * out_disable_reason = nullptr);
/// Try and find a single part to mutate and mutate it. If some part was successfully mutated, return true.
bool tryMutatePart();
@ -146,7 +108,7 @@ private:
BackgroundProcessingPoolTaskResult backgroundTask();
Int64 getCurrentMutationVersion(
const MergeTreeData::DataPartPtr & part,
const DataPartPtr & part,
std::lock_guard<std::mutex> & /* currently_merging_mutex_lock */) const;
void clearOldMutations();
@ -182,7 +144,7 @@ protected:
const ASTPtr & primary_key_ast_,
const ASTPtr & sample_by_ast_, /// nullptr, if sampling is not supported.
const ASTPtr & ttl_table_ast_,
const MergeTreeData::MergingParams & merging_params_,
const MergingParams & merging_params_,
const MergeTreeSettings & settings_,
bool has_force_restore_data_flag);
};

File diff suppressed because it is too large Load Diff

View File

@ -72,36 +72,20 @@ namespace DB
* as the time will take the time of creation the appropriate part on any of the replicas.
*/
class StorageReplicatedMergeTree : public ext::shared_ptr_helper<StorageReplicatedMergeTree>, public IStorage
class StorageReplicatedMergeTree : public ext::shared_ptr_helper<StorageReplicatedMergeTree>, public MergeTreeData
{
public:
void startup() override;
void shutdown() override;
~StorageReplicatedMergeTree() override;
std::string getName() const override { return "Replicated" + data.merging_params.getModeName() + "MergeTree"; }
std::string getName() const override { return "Replicated" + merging_params.getModeName() + "MergeTree"; }
std::string getTableName() const override { return table_name; }
std::string getDatabaseName() const override { return database_name; }
bool supportsSampling() const override { return data.supportsSampling(); }
bool supportsFinal() const override { return data.supportsFinal(); }
bool supportsPrewhere() const override { return data.supportsPrewhere(); }
bool supportsReplication() const override { return true; }
bool supportsDeduplication() const override { return true; }
const ColumnsDescription & getColumns() const override { return data.getColumns(); }
void setColumns(ColumnsDescription columns_) override { return data.setColumns(std::move(columns_)); }
NameAndTypePair getColumn(const String & column_name) const override
{
return data.getColumn(column_name);
}
bool hasColumn(const String & column_name) const override
{
return data.hasColumn(column_name);
}
BlockInputStreams read(
const Names & column_names,
const SelectQueryInfo & query_info,
@ -121,7 +105,7 @@ public:
void alterPartition(const ASTPtr & query, const PartitionCommands & commands, const Context & query_context) override;
void mutate(const MutationCommands & commands, const Context & context) override;
std::vector<MergeTreeMutationStatus> getMutationsStatus() const;
std::vector<MergeTreeMutationStatus> getMutationsStatus() const override;
CancellationCode killMutation(const String & mutation_id) override;
/** Removes a replica from ZooKeeper. If there are no other replicas, it deletes the entire table from ZooKeeper.
@ -133,10 +117,6 @@ public:
void rename(const String & new_path_to_db, const String & new_database_name, const String & new_table_name) override;
bool supportsIndexForIn() const override { return true; }
bool mayBenefitFromIndexForIn(const ASTPtr & left_in_operand, const Context & /* query_context */) const override
{
return data.mayBenefitFromIndexForIn(left_in_operand);
}
void checkTableCanBeDropped() const override;
@ -148,10 +128,6 @@ public:
/// If timeout is exceeded returns false
bool waitForShrinkingQueueSize(size_t queue_size = 0, UInt64 max_wait_milliseconds = 0);
MergeTreeData & getData() { return data; }
const MergeTreeData & getData() const { return data; }
/** For the system table replicas. */
struct Status
{
@ -194,17 +170,6 @@ public:
String getDataPath() const override { return full_path; }
ASTPtr getPartitionKeyAST() const override { return data.partition_by_ast; }
ASTPtr getSortingKeyAST() const override { return data.getSortingKeyAST(); }
ASTPtr getPrimaryKeyAST() const override { return data.getPrimaryKeyAST(); }
ASTPtr getSamplingKeyAST() const override { return data.getSamplingExpression(); }
Names getColumnsRequiredForPartitionKey() const override { return data.getColumnsRequiredForPartitionKey(); }
Names getColumnsRequiredForSortingKey() const override { return data.getColumnsRequiredForSortingKey(); }
Names getColumnsRequiredForPrimaryKey() const override { return data.getColumnsRequiredForPrimaryKey(); }
Names getColumnsRequiredForSampling() const override { return data.getColumnsRequiredForSampling(); }
Names getColumnsRequiredForFinal() const override { return data.getColumnsRequiredForSortingKey(); }
private:
/// Delete old parts from disk and from ZooKeeper.
void clearOldPartsAndRemoveFromZK();
@ -222,8 +187,6 @@ private:
using LogEntry = ReplicatedMergeTreeLogEntry;
using LogEntryPtr = LogEntry::Ptr;
Context global_context;
zkutil::ZooKeeperPtr current_zookeeper; /// Use only the methods below.
std::mutex current_zookeeper_mutex; /// To recreate the session in the background thread.
@ -234,10 +197,6 @@ private:
/// If true, the table is offline and can not be written to it.
std::atomic_bool is_readonly {false};
String database_name;
String table_name;
String full_path;
String zookeeper_path;
String replica_name;
String replica_path;
@ -264,7 +223,6 @@ private:
InterserverIOEndpointHolderPtr data_parts_exchange_endpoint_holder;
MergeTreeData data;
MergeTreeDataSelectExecutor reader;
MergeTreeDataWriter writer;
MergeTreeDataMergerMutator merger_mutator;
@ -325,8 +283,6 @@ private:
/// An event that awakens `alter` method from waiting for the completion of the ALTER query.
zkutil::EventPtr alter_query_event = std::make_shared<Poco::Event>();
Logger * log;
/** Creates the minimum set of nodes in ZooKeeper.
*/
void createTableIfNotExists();
@ -362,24 +318,24 @@ private:
* Adds actions to `ops` that add data about the part into ZooKeeper.
* Call under TableStructureLock.
*/
void checkPartChecksumsAndAddCommitOps(const zkutil::ZooKeeperPtr & zookeeper, const MergeTreeData::DataPartPtr & part,
void checkPartChecksumsAndAddCommitOps(const zkutil::ZooKeeperPtr & zookeeper, const DataPartPtr & part,
Coordination::Requests & ops, String part_name = "", NameSet * absent_replicas_paths = nullptr);
String getChecksumsForZooKeeper(const MergeTreeDataPartChecksums & checksums) const;
/// Accepts a PreComitted part, atomically checks its checksums with ones on other replicas and commit the part
MergeTreeData::DataPartsVector checkPartChecksumsAndCommit(MergeTreeData::Transaction & transaction,
const MergeTreeData::DataPartPtr & part);
DataPartsVector checkPartChecksumsAndCommit(Transaction & transaction,
const DataPartPtr & part);
void getCommitPartOps(
Coordination::Requests & ops,
MergeTreeData::MutableDataPartPtr & part,
MutableDataPartPtr & part,
const String & block_id_path = "") const;
/// Updates info about part columns and checksums in ZooKeeper and commits transaction if successful.
void updatePartHeaderInZooKeeperAndCommit(
const zkutil::ZooKeeperPtr & zookeeper,
MergeTreeData::AlterDataPartTransaction & transaction);
AlterDataPartTransaction & transaction);
/// Adds actions to `ops` that remove a part from ZooKeeper.
/// Set has_children to true for "old-style" parts (those with /columns and /checksums child znodes).
@ -390,7 +346,7 @@ private:
NameSet * parts_should_be_retried = nullptr);
bool tryRemovePartsFromZooKeeperWithRetries(const Strings & part_names, size_t max_retries = 5);
bool tryRemovePartsFromZooKeeperWithRetries(MergeTreeData::DataPartsVector & parts, size_t max_retries = 5);
bool tryRemovePartsFromZooKeeperWithRetries(DataPartsVector & parts, size_t max_retries = 5);
/// Removes a part from ZooKeeper and adds a task to the queue to download it. It is supposed to do this with broken parts.
void removePartAndEnqueueFetch(const String & part_name);
@ -405,8 +361,8 @@ private:
void writePartLog(
PartLogElement::Type type, const ExecutionStatus & execution_status, UInt64 elapsed_ns,
const String & new_part_name,
const MergeTreeData::DataPartPtr & result_part,
const MergeTreeData::DataPartsVector & source_parts,
const DataPartPtr & result_part,
const DataPartsVector & source_parts,
const MergeListEntry * merge_entry);
void executeDropRange(const LogEntry & entry);
@ -463,7 +419,7 @@ private:
*/
bool createLogEntryToMergeParts(
zkutil::ZooKeeperPtr & zookeeper,
const MergeTreeData::DataPartsVector & parts,
const DataPartsVector & parts,
const String & merged_name,
bool deduplicate,
ReplicatedMergeTreeLogEntryData * out_log_entry = nullptr);
@ -564,7 +520,7 @@ protected:
const ASTPtr & primary_key_ast_,
const ASTPtr & sample_by_ast_,
const ASTPtr & table_ttl_ast_,
const MergeTreeData::MergingParams & merging_params_,
const MergingParams & merging_params_,
const MergeTreeSettings & settings_,
bool has_force_restore_data_flag);
};

View File

@ -1,8 +1,6 @@
#include <optional>
#include <Storages/System/StorageSystemColumns.h>
#include <Storages/MergeTree/MergeTreeData.h>
#include <Storages/StorageMergeTree.h>
#include <Storages/StorageReplicatedMergeTree.h>
#include <Columns/ColumnsNumber.h>
#include <Columns/ColumnString.h>
#include <DataTypes/DataTypeString.h>
@ -124,16 +122,10 @@ protected:
cols_required_for_sampling = storage->getColumnsRequiredForSampling();
/** Info about sizes of columns for tables of MergeTree family.
* NOTE: It is possible to add getter for this info to IStorage interface.
*/
if (auto storage_concrete_plain = dynamic_cast<StorageMergeTree *>(storage.get()))
{
column_sizes = storage_concrete_plain->getData().getColumnSizes();
}
else if (auto storage_concrete_replicated = dynamic_cast<StorageReplicatedMergeTree *>(storage.get()))
{
column_sizes = storage_concrete_replicated->getData().getColumnSizes();
}
* NOTE: It is possible to add getter for this info to IStorage interface.
*/
if (auto storage_concrete = dynamic_cast<const MergeTreeData *>(storage.get()))
column_sizes = storage_concrete->getColumnSizes();
}
for (const auto & column : columns)

View File

@ -1,7 +1,5 @@
#include <Storages/System/StorageSystemGraphite.h>
#include <Storages/StorageMergeTree.h>
#include <Storages/StorageReplicatedMergeTree.h>
#include <Storages/MergeTree/MergeTreeData.h>
#include <Interpreters/Context.h>
@ -37,20 +35,10 @@ StorageSystemGraphite::Configs StorageSystemGraphite::getConfigs(const Context &
for (auto iterator = db.second->getIterator(context); iterator->isValid(); iterator->next())
{
auto & table = iterator->table();
const MergeTreeData * table_data = nullptr;
if (const StorageMergeTree * merge_tree = dynamic_cast<StorageMergeTree *>(table.get()))
{
table_data = &merge_tree->getData();
}
else if (const StorageReplicatedMergeTree * replicated_merge_tree = dynamic_cast<StorageReplicatedMergeTree *>(table.get()))
{
table_data = &replicated_merge_tree->getData();
}
else
{
const MergeTreeData * table_data = dynamic_cast<const MergeTreeData *>(table.get());
if (!table_data)
continue;
}
if (table_data->merging_params.mode == MergeTreeData::MergingParams::Graphite)
{

View File

@ -4,8 +4,8 @@
#include <DataTypes/DataTypeDateTime.h>
#include <DataTypes/DataTypeArray.h>
#include <DataStreams/OneBlockInputStream.h>
#include <Storages/StorageMergeTree.h>
#include <Storages/StorageReplicatedMergeTree.h>
#include <Storages/MergeTree/MergeTreeData.h>
#include <Storages/MergeTree/MergeTreeMutationStatus.h>
#include <Storages/VirtualColumnUtils.h>
#include <Databases/IDatabase.h>
@ -38,19 +38,10 @@ void StorageSystemMutations::fillData(MutableColumns & res_columns, const Contex
/// Collect a set of *MergeTree tables.
std::map<String, std::map<String, StoragePtr>> merge_tree_tables;
for (const auto & db : context.getDatabases())
{
if (context.hasDatabaseAccessRights(db.first))
{
for (auto iterator = db.second->getIterator(context); iterator->isValid(); iterator->next())
{
if (dynamic_cast<const StorageMergeTree *>(iterator->table().get())
|| dynamic_cast<const StorageReplicatedMergeTree *>(iterator->table().get()))
{
if (dynamic_cast<const MergeTreeData *>(iterator->table().get()))
merge_tree_tables[db.first][iterator->name()] = iterator->table();
}
}
}
}
MutableColumnPtr col_database_mut = ColumnString::create();
MutableColumnPtr col_table_mut = ColumnString::create();
@ -92,10 +83,8 @@ void StorageSystemMutations::fillData(MutableColumns & res_columns, const Contex
std::vector<MergeTreeMutationStatus> statuses;
{
const IStorage * storage = merge_tree_tables[database][table].get();
if (const auto * merge_tree = dynamic_cast<const StorageMergeTree *>(storage))
if (const auto * merge_tree = dynamic_cast<const MergeTreeData *>(storage))
statuses = merge_tree->getMutationsStatus();
else if (const auto * replicated = dynamic_cast<const StorageReplicatedMergeTree *>(storage))
statuses = replicated->getMutationsStatus();
}
for (const MergeTreeMutationStatus & status : statuses)

View File

@ -6,8 +6,6 @@
#include <DataTypes/DataTypeDate.h>
#include <DataStreams/OneBlockInputStream.h>
#include <Storages/System/StorageSystemParts.h>
#include <Storages/StorageMergeTree.h>
#include <Storages/StorageReplicatedMergeTree.h>
#include <Storages/VirtualColumnUtils.h>
#include <Databases/IDatabase.h>

View File

@ -7,8 +7,7 @@
#include <DataTypes/DataTypeDateTime.h>
#include <DataTypes/DataTypeDate.h>
#include <DataStreams/OneBlockInputStream.h>
#include <Storages/StorageMergeTree.h>
#include <Storages/StorageReplicatedMergeTree.h>
#include <Storages/MergeTree/MergeTreeData.h>
#include <Storages/VirtualColumnUtils.h>
#include <Databases/IDatabase.h>
#include <Parsers/queryToString.h>
@ -93,8 +92,7 @@ public:
StoragePtr storage = iterator->table();
String engine_name = storage->getName();
if (!dynamic_cast<StorageMergeTree *>(&*storage) &&
!dynamic_cast<StorageReplicatedMergeTree *>(&*storage))
if (!dynamic_cast<MergeTreeData *>(storage.get()))
continue;
storages[std::make_pair(database_name, iterator->name())] = storage;
@ -184,20 +182,9 @@ public:
info.engine = info.storage->getName();
info.data = nullptr;
if (auto merge_tree = dynamic_cast<StorageMergeTree *>(&*info.storage))
{
info.data = &merge_tree->getData();
}
else if (auto replicated_merge_tree = dynamic_cast<StorageReplicatedMergeTree *>(&*info.storage))
{
info.data = &replicated_merge_tree->getData();
}
else
{
info.data = dynamic_cast<MergeTreeData *>(info.storage.get());
if (!info.data)
throw Exception("Unknown engine " + info.engine, ErrorCodes::LOGICAL_ERROR);
}
using State = MergeTreeDataPart::State;
auto & all_parts_state = info.all_parts_state;

View File

@ -6,7 +6,6 @@
#include <DataTypes/DataTypeDate.h>
#include <DataStreams/OneBlockInputStream.h>
#include <Storages/System/StorageSystemPartsColumns.h>
#include <Storages/StorageReplicatedMergeTree.h>
#include <Storages/VirtualColumnUtils.h>
#include <Databases/IDatabase.h>
#include <Parsers/queryToString.h>