ClickHouse/dbms/src/Storages/StorageMergeTree.cpp

553 lines
19 KiB
C++
Raw Normal View History

2016-09-02 04:03:40 +00:00
#include <experimental/optional>
#include <Core/FieldVisitors.h>
#include <Storages/StorageMergeTree.h>
#include <Storages/MergeTree/MergeTreeBlockOutputStream.h>
#include <Storages/MergeTree/DiskSpaceMonitor.h>
#include <Storages/MergeTree/MergeList.h>
#include <Databases/IDatabase.h>
#include <Common/escapeForFileName.h>
2017-07-13 20:58:19 +00:00
#include <Common/typeid_cast.h>
#include <Interpreters/InterpreterAlterQuery.h>
#include <Interpreters/PartLog.h>
#include <Parsers/ASTFunction.h>
#include <Parsers/ASTLiteral.h>
#include <Parsers/ASTSelectQuery.h>
#include <Storages/MergeTree/MergeTreeData.h>
#include <Storages/MergeTree/ActiveDataPartSet.h>
Squashed commit of the following: commit e712f469a55ff34ad34b482b15cc4153b7ad7233 Author: Alexey Milovidov <milovidov@yandex-team.ru> Date: Sat Jan 14 11:59:13 2017 +0300 Less dependencies [#CLICKHOUSE-2] commit 2a002823084e3a79bffcc17d479620a68eb0644b Author: Alexey Milovidov <milovidov@yandex-team.ru> Date: Sat Jan 14 11:58:30 2017 +0300 Less dependencies [#CLICKHOUSE-2] commit 9e06f407c8ee781ed8ddf98bdfcc31846bf2a0fe Author: Alexey Milovidov <milovidov@yandex-team.ru> Date: Sat Jan 14 11:55:14 2017 +0300 Less dependencies [#CLICKHOUSE-2] commit 9581620f1e839f456fa7894aa1f996d5162ac6cd Author: Alexey Milovidov <milovidov@yandex-team.ru> Date: Sat Jan 14 11:54:22 2017 +0300 Less dependencies [#CLICKHOUSE-2] commit 2a8564c68cb6cc3649fafaf401256d43c9a2e777 Author: Alexey Milovidov <milovidov@yandex-team.ru> Date: Sat Jan 14 11:47:34 2017 +0300 Less dependencies [#CLICKHOUSE-2] commit cf60632d78ec656be3304ef4565e859bb6ce80ba Author: Alexey Milovidov <milovidov@yandex-team.ru> Date: Sat Jan 14 11:40:09 2017 +0300 Less dependencies [#CLICKHOUSE-2] commit ee3d1dc6e0c4ca60e3ac1e0c30d4b3ed1e66eca0 Author: Alexey Milovidov <milovidov@yandex-team.ru> Date: Sat Jan 14 11:22:49 2017 +0300 Less dependencies [#CLICKHOUSE-2] commit 65592ef7116a90104fcd524b53ef8b7cf22640f2 Author: Alexey Milovidov <milovidov@yandex-team.ru> Date: Sat Jan 14 11:18:17 2017 +0300 Less dependencies [#CLICKHOUSE-2] commit 37972c257320d3b7e7b294e0fdeffff218647bfd Author: Alexey Milovidov <milovidov@yandex-team.ru> Date: Sat Jan 14 11:17:06 2017 +0300 Less dependencies [#CLICKHOUSE-2] commit dd909d149974ce5bed2456de1261aa5a368fd3ff Author: Alexey Milovidov <milovidov@yandex-team.ru> Date: Sat Jan 14 11:16:28 2017 +0300 Less dependencies [#CLICKHOUSE-2] commit 3cf43266ca7e30adf01212b1a739ba5fe43639fd Author: Alexey Milovidov <milovidov@yandex-team.ru> Date: Sat Jan 14 11:15:42 2017 +0300 Less dependencies [#CLICKHOUSE-2] commit 6731a3df96d1609286e2536b6432916af7743f0f Author: Alexey Milovidov <milovidov@yandex-team.ru> Date: Sat Jan 14 11:13:35 2017 +0300 Less dependencies [#CLICKHOUSE-2] commit 1b5727e0d56415b7add4cb76110105358663602c Author: Alexey Milovidov <milovidov@yandex-team.ru> Date: Sat Jan 14 11:11:18 2017 +0300 Less dependencies [#CLICKHOUSE-2] commit bbcf726a55685b8e72f5b40ba0bf1904bd1c0407 Author: Alexey Milovidov <milovidov@yandex-team.ru> Date: Sat Jan 14 11:09:04 2017 +0300 Less dependencies [#CLICKHOUSE-2] commit c03b477d5e2e65014e8906ecfa2efb67ee295af1 Author: Alexey Milovidov <milovidov@yandex-team.ru> Date: Sat Jan 14 11:06:30 2017 +0300 Less dependencies [#CLICKHOUSE-2] commit 2986e2fb0466bc18d73693dcdded28fccc0dc66b Author: Alexey Milovidov <milovidov@yandex-team.ru> Date: Sat Jan 14 11:05:44 2017 +0300 Less dependencies [#CLICKHOUSE-2] commit 5d6cdef13d2e02bd5c4954983334e9162ab2635b Author: Alexey Milovidov <milovidov@yandex-team.ru> Date: Sat Jan 14 11:04:53 2017 +0300 Less dependencies [#CLICKHOUSE-2] commit f2b819b25ce8b2ccdcb201eefb03e1e6f5aab590 Author: Alexey Milovidov <milovidov@yandex-team.ru> Date: Sat Jan 14 11:01:47 2017 +0300 Less dependencies [#CLICKHOUSE-2]
2017-01-14 09:00:19 +00:00
2015-04-16 06:12:35 +00:00
#include <Poco/DirectoryIterator.h>
Squashed commit of the following: commit e712f469a55ff34ad34b482b15cc4153b7ad7233 Author: Alexey Milovidov <milovidov@yandex-team.ru> Date: Sat Jan 14 11:59:13 2017 +0300 Less dependencies [#CLICKHOUSE-2] commit 2a002823084e3a79bffcc17d479620a68eb0644b Author: Alexey Milovidov <milovidov@yandex-team.ru> Date: Sat Jan 14 11:58:30 2017 +0300 Less dependencies [#CLICKHOUSE-2] commit 9e06f407c8ee781ed8ddf98bdfcc31846bf2a0fe Author: Alexey Milovidov <milovidov@yandex-team.ru> Date: Sat Jan 14 11:55:14 2017 +0300 Less dependencies [#CLICKHOUSE-2] commit 9581620f1e839f456fa7894aa1f996d5162ac6cd Author: Alexey Milovidov <milovidov@yandex-team.ru> Date: Sat Jan 14 11:54:22 2017 +0300 Less dependencies [#CLICKHOUSE-2] commit 2a8564c68cb6cc3649fafaf401256d43c9a2e777 Author: Alexey Milovidov <milovidov@yandex-team.ru> Date: Sat Jan 14 11:47:34 2017 +0300 Less dependencies [#CLICKHOUSE-2] commit cf60632d78ec656be3304ef4565e859bb6ce80ba Author: Alexey Milovidov <milovidov@yandex-team.ru> Date: Sat Jan 14 11:40:09 2017 +0300 Less dependencies [#CLICKHOUSE-2] commit ee3d1dc6e0c4ca60e3ac1e0c30d4b3ed1e66eca0 Author: Alexey Milovidov <milovidov@yandex-team.ru> Date: Sat Jan 14 11:22:49 2017 +0300 Less dependencies [#CLICKHOUSE-2] commit 65592ef7116a90104fcd524b53ef8b7cf22640f2 Author: Alexey Milovidov <milovidov@yandex-team.ru> Date: Sat Jan 14 11:18:17 2017 +0300 Less dependencies [#CLICKHOUSE-2] commit 37972c257320d3b7e7b294e0fdeffff218647bfd Author: Alexey Milovidov <milovidov@yandex-team.ru> Date: Sat Jan 14 11:17:06 2017 +0300 Less dependencies [#CLICKHOUSE-2] commit dd909d149974ce5bed2456de1261aa5a368fd3ff Author: Alexey Milovidov <milovidov@yandex-team.ru> Date: Sat Jan 14 11:16:28 2017 +0300 Less dependencies [#CLICKHOUSE-2] commit 3cf43266ca7e30adf01212b1a739ba5fe43639fd Author: Alexey Milovidov <milovidov@yandex-team.ru> Date: Sat Jan 14 11:15:42 2017 +0300 Less dependencies [#CLICKHOUSE-2] commit 6731a3df96d1609286e2536b6432916af7743f0f Author: Alexey Milovidov <milovidov@yandex-team.ru> Date: Sat Jan 14 11:13:35 2017 +0300 Less dependencies [#CLICKHOUSE-2] commit 1b5727e0d56415b7add4cb76110105358663602c Author: Alexey Milovidov <milovidov@yandex-team.ru> Date: Sat Jan 14 11:11:18 2017 +0300 Less dependencies [#CLICKHOUSE-2] commit bbcf726a55685b8e72f5b40ba0bf1904bd1c0407 Author: Alexey Milovidov <milovidov@yandex-team.ru> Date: Sat Jan 14 11:09:04 2017 +0300 Less dependencies [#CLICKHOUSE-2] commit c03b477d5e2e65014e8906ecfa2efb67ee295af1 Author: Alexey Milovidov <milovidov@yandex-team.ru> Date: Sat Jan 14 11:06:30 2017 +0300 Less dependencies [#CLICKHOUSE-2] commit 2986e2fb0466bc18d73693dcdded28fccc0dc66b Author: Alexey Milovidov <milovidov@yandex-team.ru> Date: Sat Jan 14 11:05:44 2017 +0300 Less dependencies [#CLICKHOUSE-2] commit 5d6cdef13d2e02bd5c4954983334e9162ab2635b Author: Alexey Milovidov <milovidov@yandex-team.ru> Date: Sat Jan 14 11:04:53 2017 +0300 Less dependencies [#CLICKHOUSE-2] commit f2b819b25ce8b2ccdcb201eefb03e1e6f5aab590 Author: Alexey Milovidov <milovidov@yandex-team.ru> Date: Sat Jan 14 11:01:47 2017 +0300 Less dependencies [#CLICKHOUSE-2]
2017-01-14 09:00:19 +00:00
#include <Poco/File.h>
2012-07-19 20:32:10 +00:00
2012-07-17 20:04:39 +00:00
namespace DB
{
namespace ErrorCodes
{
extern const int ABORTED;
extern const int BAD_ARGUMENTS;
extern const int INCORRECT_DATA;
}
StorageMergeTree::StorageMergeTree(
const String & path_,
const String & database_name_,
const String & table_name_,
NamesAndTypesListPtr columns_,
const NamesAndTypesList & materialized_columns_,
const NamesAndTypesList & alias_columns_,
const ColumnDefaults & column_defaults_,
bool attach,
Context & context_,
const ASTPtr & primary_expr_ast_,
const String & date_column_name,
const ASTPtr & partition_expr_ast_,
const ASTPtr & sampling_expression_, /// nullptr, if sampling is not supported.
size_t index_granularity_,
const MergeTreeData::MergingParams & merging_params_,
bool has_force_restore_data_flag,
const MergeTreeSettings & settings_)
: IStorage{materialized_columns_, alias_columns_, column_defaults_},
path(path_), database_name(database_name_), table_name(table_name_), full_path(path + escapeForFileName(table_name) + '/'),
context(context_), background_pool(context_.getBackgroundPool()),
data(database_name, table_name,
full_path, columns_,
materialized_columns_, alias_columns_, column_defaults_,
context_, primary_expr_ast_, date_column_name, partition_expr_ast_,
sampling_expression_, index_granularity_, merging_params_,
settings_, database_name_ + "." + table_name, false, attach),
reader(data), writer(data), merger(data, context.getBackgroundPool()),
log(&Logger::get(database_name_ + "." + table_name + " (StorageMergeTree)"))
2014-03-13 12:48:07 +00:00
{
data.loadDataParts(has_force_restore_data_flag);
if (!attach)
{
if (!data.getDataParts().empty())
throw Exception("Data directory for table already containing data parts - probably it was unclean DROP table or manual intervention. You must either clear directory by hand or use ATTACH TABLE instead of CREATE TABLE if you need to use that parts.", ErrorCodes::INCORRECT_DATA);
}
else
{
data.clearOldParts();
}
/// Temporary directories contain incomplete results of merges (after forced restart)
/// and don't allow to reinitialize them, so delete each of them immediately
data.clearOldTemporaryDirectories(0);
increment.set(data.getMaxDataPartIndex());
2014-03-13 12:48:07 +00:00
}
2012-07-17 20:04:39 +00:00
void StorageMergeTree::startup()
{
merge_task_handle = background_pool.addTask([this] { return mergeTask(); });
}
2014-11-12 10:37:47 +00:00
2013-09-30 01:29:19 +00:00
void StorageMergeTree::shutdown()
2012-07-30 20:32:36 +00:00
{
if (shutdown_called)
return;
shutdown_called = true;
merger.cancelForever();
if (merge_task_handle)
background_pool.removeTask(merge_task_handle);
2012-07-18 19:44:04 +00:00
}
2014-03-13 12:48:07 +00:00
StorageMergeTree::~StorageMergeTree()
{
shutdown();
2014-03-13 12:48:07 +00:00
}
2012-07-18 19:44:04 +00:00
2012-07-21 05:07:14 +00:00
BlockInputStreams StorageMergeTree::read(
const Names & column_names,
const SelectQueryInfo & query_info,
const Context & context,
QueryProcessingStage::Enum & processed_stage,
const size_t max_block_size,
2017-06-02 15:54:39 +00:00
const unsigned num_streams)
2012-07-21 05:07:14 +00:00
{
return reader.read(column_names, query_info, context, processed_stage, max_block_size, num_streams, nullptr, 0);
2012-12-06 09:45:09 +00:00
}
2017-05-21 22:25:25 +00:00
BlockOutputStreamPtr StorageMergeTree::write(const ASTPtr & query, const Settings & settings)
2013-01-23 11:16:32 +00:00
{
return std::make_shared<MergeTreeBlockOutputStream>(*this);
2013-01-23 11:16:32 +00:00
}
bool StorageMergeTree::checkTableCanBeDropped() const
2012-08-16 18:17:01 +00:00
{
const_cast<MergeTreeData &>(getData()).recalculateColumnSizes();
context.checkTableCanBeDropped(database_name, table_name, getData().getTotalCompressedSize());
return true;
}
void StorageMergeTree::drop()
{
shutdown();
data.dropAllData();
2013-08-07 13:07:42 +00:00
}
void StorageMergeTree::rename(const String & new_path_to_db, const String & new_database_name, const String & new_table_name)
{
std::string new_full_path = new_path_to_db + escapeForFileName(new_table_name) + '/';
2014-03-13 19:14:25 +00:00
data.setPath(new_full_path, true);
2014-03-13 12:48:07 +00:00
path = new_path_to_db;
table_name = new_table_name;
full_path = new_full_path;
2014-03-13 12:48:07 +00:00
/// NOTE: Logger names are not updated.
}
void StorageMergeTree::alter(
const AlterCommands & params,
const String & database_name,
const String & table_name,
const Context & context)
2013-08-07 13:07:42 +00:00
{
/// NOTE: Here, as in ReplicatedMergeTree, you can do ALTER which does not block the writing of data for a long time.
auto merge_blocker = merger.cancel();
auto table_soft_lock = lockDataForAlter(__PRETTY_FUNCTION__);
data.checkAlter(params);
auto new_columns = data.getColumnsListNonMaterialized();
auto new_materialized_columns = data.materialized_columns;
auto new_alias_columns = data.alias_columns;
auto new_column_defaults = data.column_defaults;
params.apply(new_columns, new_materialized_columns, new_alias_columns, new_column_defaults);
auto columns_for_parts = new_columns;
columns_for_parts.insert(std::end(columns_for_parts),
std::begin(new_materialized_columns), std::end(new_materialized_columns));
std::vector<MergeTreeData::AlterDataPartTransactionPtr> transactions;
bool primary_key_is_modified = false;
ASTPtr new_primary_key_ast = data.primary_expr_ast;
for (const AlterCommand & param : params)
{
if (param.type == AlterCommand::MODIFY_PRIMARY_KEY)
{
primary_key_is_modified = true;
new_primary_key_ast = param.primary_key;
}
}
if (primary_key_is_modified && data.merging_params.mode == MergeTreeData::MergingParams::Unsorted)
throw Exception("UnsortedMergeTree cannot have primary key", ErrorCodes::BAD_ARGUMENTS);
if (primary_key_is_modified && supportsSampling())
throw Exception("MODIFY PRIMARY KEY only supported for tables without sampling key", ErrorCodes::BAD_ARGUMENTS);
MergeTreeData::DataParts parts = data.getAllDataParts();
for (const MergeTreeData::DataPartPtr & part : parts)
{
if (auto transaction = data.alterDataPart(part, columns_for_parts, new_primary_key_ast, false))
transactions.push_back(std::move(transaction));
}
auto table_hard_lock = lockStructureForAlter(__PRETTY_FUNCTION__);
IDatabase::ASTModifier engine_modifier;
if (primary_key_is_modified)
engine_modifier = [&new_primary_key_ast] (ASTPtr & engine_ast)
{
auto tuple = std::make_shared<ASTFunction>(new_primary_key_ast->range);
tuple->name = "tuple";
tuple->arguments = new_primary_key_ast;
tuple->children.push_back(tuple->arguments);
/// Primary key is in the second place in table engine description and can be represented as a tuple.
/// TODO: Not always in second place. If there is a sampling key, then the third one. Fix it.
typeid_cast<ASTExpressionList &>(*typeid_cast<ASTFunction &>(*engine_ast).arguments).children.at(1) = tuple;
};
context.getDatabase(database_name)->alterTable(
context, table_name,
new_columns, new_materialized_columns, new_alias_columns, new_column_defaults,
engine_modifier);
materialized_columns = new_materialized_columns;
alias_columns = new_alias_columns;
column_defaults = new_column_defaults;
data.setColumnsList(new_columns);
data.materialized_columns = std::move(new_materialized_columns);
data.alias_columns = std::move(new_alias_columns);
data.column_defaults = std::move(new_column_defaults);
if (primary_key_is_modified)
{
data.primary_expr_ast = new_primary_key_ast;
}
/// Reinitialize primary key because primary key column types might have changed.
data.initPrimaryKey();
for (auto & transaction : transactions)
transaction->commit();
/// Columns sizes could be changed
data.recalculateColumnSizes();
if (primary_key_is_modified)
data.loadDataParts(false);
2014-03-20 13:00:42 +00:00
}
2016-09-02 04:03:40 +00:00
/// While exists, marks parts as 'currently_merging' and reserves free space on filesystem.
/// It's possible to mark parts before.
struct CurrentlyMergingPartsTagger
{
MergeTreeData::DataPartsVector parts;
DiskSpaceMonitor::ReservationPtr reserved_space;
StorageMergeTree * storage = nullptr;
CurrentlyMergingPartsTagger() = default;
CurrentlyMergingPartsTagger(const MergeTreeData::DataPartsVector & parts_, size_t total_size, StorageMergeTree & storage_)
: parts(parts_), storage(&storage_)
{
/// Assume mutex is already locked, because this method is called from mergeTask.
reserved_space = DiskSpaceMonitor::reserve(storage->full_path, total_size); /// May throw.
for (const auto & part : parts)
{
if (storage->currently_merging.count(part))
throw Exception("Tagging alreagy tagged part " + part->name + ". This is a bug.", ErrorCodes::LOGICAL_ERROR);
}
storage->currently_merging.insert(parts.begin(), parts.end());
}
~CurrentlyMergingPartsTagger()
{
std::lock_guard<std::mutex> lock(storage->currently_merging_mutex);
for (const auto & part : parts)
{
if (!storage->currently_merging.count(part))
std::terminate();
storage->currently_merging.erase(part);
}
}
2016-09-02 04:03:40 +00:00
};
bool StorageMergeTree::merge(
size_t aio_threshold,
bool aggressive,
const String & partition_id,
bool final,
bool deduplicate)
2014-03-13 12:48:07 +00:00
{
/// Clear old parts. It does not matter to do it more frequently than each second.
if (auto lock = time_after_previous_cleanup.lockTestAndRestartAfter(1))
{
data.clearOldParts();
data.clearOldTemporaryDirectories();
}
auto structure_lock = lockStructure(true, __PRETTY_FUNCTION__);
2014-03-13 12:48:07 +00:00
size_t disk_space = DiskSpaceMonitor::getUnreservedFreeSpace(full_path);
2014-03-13 12:48:07 +00:00
MergeTreeDataMerger::FuturePart future_part;
/// You must call destructor with unlocked `currently_merging_mutex`.
std::experimental::optional<CurrentlyMergingPartsTagger> merging_tagger;
2014-03-13 12:48:07 +00:00
{
std::lock_guard<std::mutex> lock(currently_merging_mutex);
2014-03-27 11:30:54 +00:00
auto can_merge = [this] (const MergeTreeData::DataPartPtr & left, const MergeTreeData::DataPartPtr & right)
{
return !currently_merging.count(left) && !currently_merging.count(right);
};
2016-08-13 01:59:09 +00:00
bool selected = false;
if (partition_id.empty())
{
size_t max_parts_size_for_merge = merger.getMaxPartsSizeForMerge();
if (max_parts_size_for_merge > 0)
selected = merger.selectPartsToMerge(future_part, aggressive, max_parts_size_for_merge, can_merge);
}
else
{
selected = merger.selectAllPartsToMergeWithinPartition(future_part, disk_space, can_merge, partition_id, final);
}
2014-03-13 12:48:07 +00:00
if (!selected)
return false;
merging_tagger.emplace(future_part.parts, MergeTreeDataMerger::estimateDiskSpaceForMerge(future_part.parts), *this);
}
2014-03-13 12:48:07 +00:00
MergeList::EntryPtr merge_entry_ptr = context.getMergeList().insert(database_name, table_name, future_part.name, future_part.parts);
/// Logging
Stopwatch stopwatch;
auto new_part = merger.mergePartsToTemporaryPart(
future_part, *merge_entry_ptr, aio_threshold, time(0), merging_tagger->reserved_space.get(), deduplicate);
merger.renameMergedTemporaryPart(new_part, future_part.parts, nullptr);
2014-03-13 12:48:07 +00:00
if (auto part_log = context.getPartLog(database_name, table_name))
{
PartLogElement elem;
elem.event_time = time(nullptr);
elem.merged_from.reserve(future_part.parts.size());
for (const auto & part : future_part.parts)
elem.merged_from.push_back(part->name);
elem.event_type = PartLogElement::MERGE_PARTS;
elem.size_in_bytes = new_part->size_in_bytes;
elem.database_name = new_part->storage.getDatabaseName();
elem.table_name = new_part->storage.getTableName();
elem.part_name = new_part->name;
elem.duration_ms = stopwatch.elapsed() / 1000000;
part_log->add(elem);
elem.duration_ms = 0;
elem.event_type = PartLogElement::REMOVE_PART;
elem.merged_from = Strings();
for (const auto & part : future_part.parts)
{
elem.part_name = part->name;
elem.size_in_bytes = part->size_in_bytes;
part_log->add(elem);
}
}
return true;
2014-04-11 13:05:17 +00:00
}
bool StorageMergeTree::mergeTask()
2014-03-13 12:48:07 +00:00
{
if (shutdown_called)
return false;
try
{
size_t aio_threshold = context.getSettings().min_bytes_to_use_direct_io;
return merge(aio_threshold, false /*aggressive*/, {} /*partition_id*/, false /*final*/, false /*deduplicate*/); ///TODO: read deduplicate option from table config
}
catch (Exception & e)
{
if (e.code() == ErrorCodes::ABORTED)
{
LOG_INFO(log, e.message());
return false;
}
throw;
}
2014-03-13 12:48:07 +00:00
}
void StorageMergeTree::clearColumnInPartition(const ASTPtr & partition, const Field & column_name, const Context & context)
{
/// Asks to complete merges and does not allow them to start.
/// This protects against "revival" of data for a removed partition after completion of merge.
auto merge_blocker = merger.cancel();
/// We don't change table structure, only data in some parts, parts are locked inside alterDataPart() function
auto lock_read_structure = lockStructure(false, __PRETTY_FUNCTION__);
String partition_id = data.getPartitionIDFromQuery(partition, context);
MergeTreeData::DataParts parts = data.getDataParts();
std::vector<MergeTreeData::AlterDataPartTransactionPtr> transactions;
AlterCommand alter_command;
alter_command.type = AlterCommand::DROP_COLUMN;
alter_command.column_name = get<String>(column_name);
auto new_columns = data.getColumnsListNonMaterialized();
auto new_materialized_columns = data.materialized_columns;
auto new_alias_columns = data.alias_columns;
auto new_column_defaults = data.column_defaults;
alter_command.apply(new_columns, new_materialized_columns, new_alias_columns, new_column_defaults);
auto columns_for_parts = new_columns;
columns_for_parts.insert(std::end(columns_for_parts),
std::begin(new_materialized_columns), std::end(new_materialized_columns));
for (const auto & part : parts)
{
if (part->info.partition_id != partition_id)
continue;
if (auto transaction = data.alterDataPart(part, columns_for_parts, data.primary_expr_ast, false))
transactions.push_back(std::move(transaction));
LOG_DEBUG(log, "Removing column " << get<String>(column_name) << " from part " << part->name);
}
if (transactions.empty())
return;
for (auto & transaction : transactions)
transaction->commit();
data.recalculateColumnSizes();
}
2014-04-11 13:05:17 +00:00
bool StorageMergeTree::optimize(
const ASTPtr & query, const ASTPtr & partition, bool final, bool deduplicate, const Context & context)
{
String partition_id;
if (partition)
partition_id = data.getPartitionIDFromQuery(partition, context);
return merge(context.getSettingsRef().min_bytes_to_use_direct_io, true, partition_id, final, deduplicate);
}
void StorageMergeTree::dropPartition(const ASTPtr & query, const ASTPtr & partition, bool detach, const Context & context)
2014-10-03 17:57:01 +00:00
{
/// Asks to complete merges and does not allow them to start.
/// This protects against "revival" of data for a removed partition after completion of merge.
auto merge_blocker = merger.cancel();
/// Waits for completion of merge and does not start new ones.
auto lock = lockForAlter(__PRETTY_FUNCTION__);
2014-10-03 18:41:16 +00:00
String partition_id = data.getPartitionIDFromQuery(partition, context);
2014-10-03 17:57:01 +00:00
size_t removed_parts = 0;
MergeTreeData::DataParts parts = data.getDataParts();
2014-10-03 17:57:01 +00:00
for (const auto & part : parts)
{
if (part->info.partition_id != partition_id)
continue;
2014-10-03 17:57:01 +00:00
LOG_DEBUG(log, "Removing part " << part->name);
++removed_parts;
2014-10-03 17:57:01 +00:00
if (detach)
data.renameAndDetachPart(part, "");
else
data.replaceParts({part}, {}, false);
}
2014-10-03 17:57:01 +00:00
LOG_INFO(log, (detach ? "Detached " : "Removed ") << removed_parts << " parts inside partition ID " << partition_id << ".");
2014-10-03 17:57:01 +00:00
}
2014-10-03 18:41:16 +00:00
void StorageMergeTree::attachPartition(const ASTPtr & partition, bool part, const Context & context)
2014-10-03 18:41:16 +00:00
{
String partition_id;
if (part)
partition_id = typeid_cast<const ASTLiteral &>(*partition).value.safeGet<String>();
else
partition_id = data.getPartitionIDFromQuery(partition, context);
String source_dir = "detached/";
/// Let's make a list of parts to add.
Strings parts;
if (part)
{
parts.push_back(partition_id);
}
else
{
LOG_DEBUG(log, "Looking for parts for partition " << partition_id << " in " << source_dir);
ActiveDataPartSet active_parts(data.format_version);
for (Poco::DirectoryIterator it = Poco::DirectoryIterator(full_path + source_dir); it != Poco::DirectoryIterator(); ++it)
{
const String & name = it.name();
MergeTreePartInfo part_info;
if (!MergeTreePartInfo::tryParsePartName(name, &part_info, data.format_version)
|| part_info.partition_id != partition_id)
{
continue;
}
LOG_DEBUG(log, "Found part " << name);
active_parts.add(name);
}
LOG_DEBUG(log, active_parts.size() << " of them are active");
parts = active_parts.getParts();
}
for (const auto & source_part_name : parts)
{
String source_path = source_dir + source_part_name;
LOG_DEBUG(log, "Checking data");
MergeTreeData::MutableDataPartPtr part = data.loadPartAndFixMetadata(source_path);
LOG_INFO(log, "Attaching part " << source_part_name << " from " << source_path);
data.renameTempPartAndAdd(part, &increment);
LOG_INFO(log, "Finished attaching part");
}
2017-04-16 15:00:33 +00:00
/// New parts with other data may appear in place of deleted parts.
context.dropCaches();
2014-10-03 18:41:16 +00:00
}
void StorageMergeTree::freezePartition(const ASTPtr & partition, const String & with_name, const Context & context)
{
data.freezePartition(partition, with_name, context);
}
2012-07-17 20:04:39 +00:00
}