ClickHouse/dbms/src/Storages/StorageMergeTree.cpp

544 lines
19 KiB
C++
Raw Normal View History

2016-09-02 04:03:40 +00:00
#include <experimental/optional>
#include <Core/FieldVisitors.h>
#include <Storages/StorageMergeTree.h>
#include <Storages/MergeTree/MergeTreeBlockOutputStream.h>
#include <Storages/MergeTree/DiskSpaceMonitor.h>
#include <Storages/MergeTree/MergeList.h>
#include <Databases/IDatabase.h>
#include <Common/escapeForFileName.h>
#include <Interpreters/InterpreterAlterQuery.h>
#include <Interpreters/PartLog.h>
#include <Parsers/ASTFunction.h>
#include <Parsers/ASTSelectQuery.h>
#include <Storages/MergeTree/MergeTreeData.h>
Squashed commit of the following: commit e712f469a55ff34ad34b482b15cc4153b7ad7233 Author: Alexey Milovidov <milovidov@yandex-team.ru> Date: Sat Jan 14 11:59:13 2017 +0300 Less dependencies [#CLICKHOUSE-2] commit 2a002823084e3a79bffcc17d479620a68eb0644b Author: Alexey Milovidov <milovidov@yandex-team.ru> Date: Sat Jan 14 11:58:30 2017 +0300 Less dependencies [#CLICKHOUSE-2] commit 9e06f407c8ee781ed8ddf98bdfcc31846bf2a0fe Author: Alexey Milovidov <milovidov@yandex-team.ru> Date: Sat Jan 14 11:55:14 2017 +0300 Less dependencies [#CLICKHOUSE-2] commit 9581620f1e839f456fa7894aa1f996d5162ac6cd Author: Alexey Milovidov <milovidov@yandex-team.ru> Date: Sat Jan 14 11:54:22 2017 +0300 Less dependencies [#CLICKHOUSE-2] commit 2a8564c68cb6cc3649fafaf401256d43c9a2e777 Author: Alexey Milovidov <milovidov@yandex-team.ru> Date: Sat Jan 14 11:47:34 2017 +0300 Less dependencies [#CLICKHOUSE-2] commit cf60632d78ec656be3304ef4565e859bb6ce80ba Author: Alexey Milovidov <milovidov@yandex-team.ru> Date: Sat Jan 14 11:40:09 2017 +0300 Less dependencies [#CLICKHOUSE-2] commit ee3d1dc6e0c4ca60e3ac1e0c30d4b3ed1e66eca0 Author: Alexey Milovidov <milovidov@yandex-team.ru> Date: Sat Jan 14 11:22:49 2017 +0300 Less dependencies [#CLICKHOUSE-2] commit 65592ef7116a90104fcd524b53ef8b7cf22640f2 Author: Alexey Milovidov <milovidov@yandex-team.ru> Date: Sat Jan 14 11:18:17 2017 +0300 Less dependencies [#CLICKHOUSE-2] commit 37972c257320d3b7e7b294e0fdeffff218647bfd Author: Alexey Milovidov <milovidov@yandex-team.ru> Date: Sat Jan 14 11:17:06 2017 +0300 Less dependencies [#CLICKHOUSE-2] commit dd909d149974ce5bed2456de1261aa5a368fd3ff Author: Alexey Milovidov <milovidov@yandex-team.ru> Date: Sat Jan 14 11:16:28 2017 +0300 Less dependencies [#CLICKHOUSE-2] commit 3cf43266ca7e30adf01212b1a739ba5fe43639fd Author: Alexey Milovidov <milovidov@yandex-team.ru> Date: Sat Jan 14 11:15:42 2017 +0300 Less dependencies [#CLICKHOUSE-2] commit 6731a3df96d1609286e2536b6432916af7743f0f Author: Alexey Milovidov <milovidov@yandex-team.ru> Date: Sat Jan 14 11:13:35 2017 +0300 Less dependencies [#CLICKHOUSE-2] commit 1b5727e0d56415b7add4cb76110105358663602c Author: Alexey Milovidov <milovidov@yandex-team.ru> Date: Sat Jan 14 11:11:18 2017 +0300 Less dependencies [#CLICKHOUSE-2] commit bbcf726a55685b8e72f5b40ba0bf1904bd1c0407 Author: Alexey Milovidov <milovidov@yandex-team.ru> Date: Sat Jan 14 11:09:04 2017 +0300 Less dependencies [#CLICKHOUSE-2] commit c03b477d5e2e65014e8906ecfa2efb67ee295af1 Author: Alexey Milovidov <milovidov@yandex-team.ru> Date: Sat Jan 14 11:06:30 2017 +0300 Less dependencies [#CLICKHOUSE-2] commit 2986e2fb0466bc18d73693dcdded28fccc0dc66b Author: Alexey Milovidov <milovidov@yandex-team.ru> Date: Sat Jan 14 11:05:44 2017 +0300 Less dependencies [#CLICKHOUSE-2] commit 5d6cdef13d2e02bd5c4954983334e9162ab2635b Author: Alexey Milovidov <milovidov@yandex-team.ru> Date: Sat Jan 14 11:04:53 2017 +0300 Less dependencies [#CLICKHOUSE-2] commit f2b819b25ce8b2ccdcb201eefb03e1e6f5aab590 Author: Alexey Milovidov <milovidov@yandex-team.ru> Date: Sat Jan 14 11:01:47 2017 +0300 Less dependencies [#CLICKHOUSE-2]
2017-01-14 09:00:19 +00:00
2015-04-16 06:12:35 +00:00
#include <Poco/DirectoryIterator.h>
Squashed commit of the following: commit e712f469a55ff34ad34b482b15cc4153b7ad7233 Author: Alexey Milovidov <milovidov@yandex-team.ru> Date: Sat Jan 14 11:59:13 2017 +0300 Less dependencies [#CLICKHOUSE-2] commit 2a002823084e3a79bffcc17d479620a68eb0644b Author: Alexey Milovidov <milovidov@yandex-team.ru> Date: Sat Jan 14 11:58:30 2017 +0300 Less dependencies [#CLICKHOUSE-2] commit 9e06f407c8ee781ed8ddf98bdfcc31846bf2a0fe Author: Alexey Milovidov <milovidov@yandex-team.ru> Date: Sat Jan 14 11:55:14 2017 +0300 Less dependencies [#CLICKHOUSE-2] commit 9581620f1e839f456fa7894aa1f996d5162ac6cd Author: Alexey Milovidov <milovidov@yandex-team.ru> Date: Sat Jan 14 11:54:22 2017 +0300 Less dependencies [#CLICKHOUSE-2] commit 2a8564c68cb6cc3649fafaf401256d43c9a2e777 Author: Alexey Milovidov <milovidov@yandex-team.ru> Date: Sat Jan 14 11:47:34 2017 +0300 Less dependencies [#CLICKHOUSE-2] commit cf60632d78ec656be3304ef4565e859bb6ce80ba Author: Alexey Milovidov <milovidov@yandex-team.ru> Date: Sat Jan 14 11:40:09 2017 +0300 Less dependencies [#CLICKHOUSE-2] commit ee3d1dc6e0c4ca60e3ac1e0c30d4b3ed1e66eca0 Author: Alexey Milovidov <milovidov@yandex-team.ru> Date: Sat Jan 14 11:22:49 2017 +0300 Less dependencies [#CLICKHOUSE-2] commit 65592ef7116a90104fcd524b53ef8b7cf22640f2 Author: Alexey Milovidov <milovidov@yandex-team.ru> Date: Sat Jan 14 11:18:17 2017 +0300 Less dependencies [#CLICKHOUSE-2] commit 37972c257320d3b7e7b294e0fdeffff218647bfd Author: Alexey Milovidov <milovidov@yandex-team.ru> Date: Sat Jan 14 11:17:06 2017 +0300 Less dependencies [#CLICKHOUSE-2] commit dd909d149974ce5bed2456de1261aa5a368fd3ff Author: Alexey Milovidov <milovidov@yandex-team.ru> Date: Sat Jan 14 11:16:28 2017 +0300 Less dependencies [#CLICKHOUSE-2] commit 3cf43266ca7e30adf01212b1a739ba5fe43639fd Author: Alexey Milovidov <milovidov@yandex-team.ru> Date: Sat Jan 14 11:15:42 2017 +0300 Less dependencies [#CLICKHOUSE-2] commit 6731a3df96d1609286e2536b6432916af7743f0f Author: Alexey Milovidov <milovidov@yandex-team.ru> Date: Sat Jan 14 11:13:35 2017 +0300 Less dependencies [#CLICKHOUSE-2] commit 1b5727e0d56415b7add4cb76110105358663602c Author: Alexey Milovidov <milovidov@yandex-team.ru> Date: Sat Jan 14 11:11:18 2017 +0300 Less dependencies [#CLICKHOUSE-2] commit bbcf726a55685b8e72f5b40ba0bf1904bd1c0407 Author: Alexey Milovidov <milovidov@yandex-team.ru> Date: Sat Jan 14 11:09:04 2017 +0300 Less dependencies [#CLICKHOUSE-2] commit c03b477d5e2e65014e8906ecfa2efb67ee295af1 Author: Alexey Milovidov <milovidov@yandex-team.ru> Date: Sat Jan 14 11:06:30 2017 +0300 Less dependencies [#CLICKHOUSE-2] commit 2986e2fb0466bc18d73693dcdded28fccc0dc66b Author: Alexey Milovidov <milovidov@yandex-team.ru> Date: Sat Jan 14 11:05:44 2017 +0300 Less dependencies [#CLICKHOUSE-2] commit 5d6cdef13d2e02bd5c4954983334e9162ab2635b Author: Alexey Milovidov <milovidov@yandex-team.ru> Date: Sat Jan 14 11:04:53 2017 +0300 Less dependencies [#CLICKHOUSE-2] commit f2b819b25ce8b2ccdcb201eefb03e1e6f5aab590 Author: Alexey Milovidov <milovidov@yandex-team.ru> Date: Sat Jan 14 11:01:47 2017 +0300 Less dependencies [#CLICKHOUSE-2]
2017-01-14 09:00:19 +00:00
#include <Poco/File.h>
2012-07-19 20:32:10 +00:00
2012-07-17 20:04:39 +00:00
namespace DB
{
namespace ErrorCodes
{
extern const int ABORTED;
extern const int BAD_ARGUMENTS;
}
StorageMergeTree::StorageMergeTree(
const String & path_,
const String & database_name_,
const String & table_name_,
NamesAndTypesListPtr columns_,
const NamesAndTypesList & materialized_columns_,
const NamesAndTypesList & alias_columns_,
const ColumnDefaults & column_defaults_,
bool attach,
Context & context_,
ASTPtr & primary_expr_ast_,
const String & date_column_name_,
const ASTPtr & sampling_expression_, /// nullptr, if sampling is not supported.
size_t index_granularity_,
const MergeTreeData::MergingParams & merging_params_,
bool has_force_restore_data_flag,
const MergeTreeSettings & settings_)
: IStorage{materialized_columns_, alias_columns_, column_defaults_},
path(path_), database_name(database_name_), table_name(table_name_), full_path(path + escapeForFileName(table_name) + '/'),
context(context_), background_pool(context_.getBackgroundPool()),
data(database_name, table_name,
full_path, columns_,
materialized_columns_, alias_columns_, column_defaults_,
context_, primary_expr_ast_, date_column_name_,
sampling_expression_, index_granularity_, merging_params_,
settings_, database_name_ + "." + table_name, false, attach),
reader(data), writer(data, context), merger(data, context.getBackgroundPool()),
log(&Logger::get(database_name_ + "." + table_name + " (StorageMergeTree)"))
2014-03-13 12:48:07 +00:00
{
data.loadDataParts(has_force_restore_data_flag);
data.clearOldParts();
/// Temporary directories contain unfinalized results of Merges (after forced restart)
/// and don't allow to reinitialize them, so delete each of them immediately
data.clearOldTemporaryDirectories(0);
increment.set(data.getMaxDataPartIndex());
2014-03-13 12:48:07 +00:00
}
2012-07-17 20:04:39 +00:00
StoragePtr StorageMergeTree::create(
const String & path_, const String & database_name_, const String & table_name_,
NamesAndTypesListPtr columns_,
const NamesAndTypesList & materialized_columns_,
const NamesAndTypesList & alias_columns_,
const ColumnDefaults & column_defaults_,
bool attach,
Context & context_,
ASTPtr & primary_expr_ast_,
const String & date_column_name_,
const ASTPtr & sampling_expression_,
size_t index_granularity_,
const MergeTreeData::MergingParams & merging_params_,
bool has_force_restore_data_flag_,
const MergeTreeSettings & settings_)
{
auto res = make_shared(
path_, database_name_, table_name_,
columns_, materialized_columns_, alias_columns_, column_defaults_, attach,
context_, primary_expr_ast_, date_column_name_,
sampling_expression_, index_granularity_, merging_params_, has_force_restore_data_flag_, settings_
);
res->merge_task_handle = res->background_pool.addTask(std::bind(&StorageMergeTree::mergeTask, res.get()));
return res;
}
2014-11-12 10:37:47 +00:00
2013-09-30 01:29:19 +00:00
void StorageMergeTree::shutdown()
2012-07-30 20:32:36 +00:00
{
if (shutdown_called)
return;
shutdown_called = true;
merger.cancelForever();
background_pool.removeTask(merge_task_handle);
2012-07-18 19:44:04 +00:00
}
2014-03-13 12:48:07 +00:00
StorageMergeTree::~StorageMergeTree()
{
shutdown();
2014-03-13 12:48:07 +00:00
}
2012-07-18 19:44:04 +00:00
2012-07-21 05:07:14 +00:00
BlockInputStreams StorageMergeTree::read(
const Names & column_names,
const ASTPtr & query,
const Context & context,
QueryProcessingStage::Enum & processed_stage,
const size_t max_block_size,
2017-06-02 15:54:39 +00:00
const unsigned num_streams)
2012-07-21 05:07:14 +00:00
{
2017-06-02 15:54:39 +00:00
return reader.read(column_names, query, context, processed_stage, max_block_size, num_streams, nullptr, 0);
2012-12-06 09:45:09 +00:00
}
2017-05-21 22:25:25 +00:00
BlockOutputStreamPtr StorageMergeTree::write(const ASTPtr & query, const Settings & settings)
2013-01-23 11:16:32 +00:00
{
return std::make_shared<MergeTreeBlockOutputStream>(*this);
2013-01-23 11:16:32 +00:00
}
bool StorageMergeTree::checkTableCanBeDropped() const
2012-08-16 18:17:01 +00:00
{
const_cast<MergeTreeData &>(getData()).recalculateColumnSizes();
context.checkTableCanBeDropped(database_name, table_name, getData().getTotalCompressedSize());
return true;
}
void StorageMergeTree::drop()
{
shutdown();
data.dropAllData();
2013-08-07 13:07:42 +00:00
}
void StorageMergeTree::rename(const String & new_path_to_db, const String & new_database_name, const String & new_table_name)
{
std::string new_full_path = new_path_to_db + escapeForFileName(new_table_name) + '/';
2014-03-13 19:14:25 +00:00
data.setPath(new_full_path, true);
2014-03-13 12:48:07 +00:00
path = new_path_to_db;
table_name = new_table_name;
full_path = new_full_path;
2014-03-13 12:48:07 +00:00
/// NOTE: Logger names are not updated.
}
void StorageMergeTree::alter(
const AlterCommands & params,
const String & database_name,
const String & table_name,
const Context & context)
2013-08-07 13:07:42 +00:00
{
/// NOTE: Here, as in ReplicatedMergeTree, you can do ALTER which does not block the writing of data for a long time.
auto merge_blocker = merger.cancel();
auto table_soft_lock = lockDataForAlter();
data.checkAlter(params);
auto new_columns = data.getColumnsListNonMaterialized();
auto new_materialized_columns = data.materialized_columns;
auto new_alias_columns = data.alias_columns;
auto new_column_defaults = data.column_defaults;
params.apply(new_columns, new_materialized_columns, new_alias_columns, new_column_defaults);
auto columns_for_parts = new_columns;
columns_for_parts.insert(std::end(columns_for_parts),
std::begin(new_materialized_columns), std::end(new_materialized_columns));
std::vector<MergeTreeData::AlterDataPartTransactionPtr> transactions;
bool primary_key_is_modified = false;
ASTPtr new_primary_key_ast = data.primary_expr_ast;
for (const AlterCommand & param : params)
{
if (param.type == AlterCommand::MODIFY_PRIMARY_KEY)
{
primary_key_is_modified = true;
new_primary_key_ast = param.primary_key;
}
}
if (primary_key_is_modified && data.merging_params.mode == MergeTreeData::MergingParams::Unsorted)
throw Exception("UnsortedMergeTree cannot have primary key", ErrorCodes::BAD_ARGUMENTS);
if (primary_key_is_modified && supportsSampling())
throw Exception("MODIFY PRIMARY KEY only supported for tables without sampling key", ErrorCodes::BAD_ARGUMENTS);
MergeTreeData::DataParts parts = data.getAllDataParts();
for (const MergeTreeData::DataPartPtr & part : parts)
if (auto transaction = data.alterDataPart(part, columns_for_parts, new_primary_key_ast, false))
transactions.push_back(std::move(transaction));
auto table_hard_lock = lockStructureForAlter();
IDatabase::ASTModifier engine_modifier;
if (primary_key_is_modified)
engine_modifier = [&new_primary_key_ast] (ASTPtr & engine_ast)
{
auto tuple = std::make_shared<ASTFunction>(new_primary_key_ast->range);
tuple->name = "tuple";
tuple->arguments = new_primary_key_ast;
tuple->children.push_back(tuple->arguments);
/// Primary key is in the second place in table engine description and can be represented as a tuple.
/// TODO: Not always in second place. If there is a sampling key, then the third one. Fix it.
typeid_cast<ASTExpressionList &>(*typeid_cast<ASTFunction &>(*engine_ast).arguments).children.at(1) = tuple;
};
context.getDatabase(database_name)->alterTable(
context, table_name,
new_columns, new_materialized_columns, new_alias_columns, new_column_defaults,
engine_modifier);
materialized_columns = new_materialized_columns;
alias_columns = new_alias_columns;
column_defaults = new_column_defaults;
data.setColumnsList(new_columns);
data.materialized_columns = std::move(new_materialized_columns);
data.alias_columns = std::move(new_alias_columns);
data.column_defaults = std::move(new_column_defaults);
if (primary_key_is_modified)
{
data.primary_expr_ast = new_primary_key_ast;
}
/// Reinitialize primary key because primary key column types might have changed.
data.initPrimaryKey();
for (auto & transaction : transactions)
transaction->commit();
/// Columns sizes could be changed
data.recalculateColumnSizes();
if (primary_key_is_modified)
data.loadDataParts(false);
2014-03-20 13:00:42 +00:00
}
2016-09-02 04:03:40 +00:00
/// While exists, marks parts as 'currently_merging' and reserves free space on filesystem.
/// It's possible to mark parts before.
struct CurrentlyMergingPartsTagger
{
MergeTreeData::DataPartsVector parts;
DiskSpaceMonitor::ReservationPtr reserved_space;
StorageMergeTree * storage = nullptr;
CurrentlyMergingPartsTagger() = default;
CurrentlyMergingPartsTagger(const MergeTreeData::DataPartsVector & parts_, size_t total_size, StorageMergeTree & storage_)
: parts(parts_), storage(&storage_)
{
/// Assume mutex is already locked, because this method is called from mergeTask.
reserved_space = DiskSpaceMonitor::reserve(storage->full_path, total_size); /// May throw.
for (const auto & part : parts)
{
if (storage->currently_merging.count(part))
throw Exception("Tagging alreagy tagged part " + part->name + ". This is a bug.", ErrorCodes::LOGICAL_ERROR);
}
storage->currently_merging.insert(parts.begin(), parts.end());
}
~CurrentlyMergingPartsTagger()
{
std::lock_guard<std::mutex> lock(storage->currently_merging_mutex);
for (const auto & part : parts)
{
if (!storage->currently_merging.count(part))
std::terminate();
storage->currently_merging.erase(part);
}
}
2016-09-02 04:03:40 +00:00
};
bool StorageMergeTree::merge(
size_t aio_threshold,
bool aggressive,
const String & partition,
bool final,
bool deduplicate)
2014-03-13 12:48:07 +00:00
{
/// Clear old parts. It does not matter to do it more frequently than each second.
if (auto lock = time_after_previous_cleanup.lockTestAndRestartAfter(1))
{
data.clearOldParts();
data.clearOldTemporaryDirectories();
}
auto structure_lock = lockStructure(true);
2014-03-13 12:48:07 +00:00
size_t disk_space = DiskSpaceMonitor::getUnreservedFreeSpace(full_path);
2014-03-13 12:48:07 +00:00
/// You must call destructor under unlocked `currently_merging_mutex`.
std::experimental::optional<CurrentlyMergingPartsTagger> merging_tagger;
String merged_name;
2014-03-13 12:48:07 +00:00
{
std::lock_guard<std::mutex> lock(currently_merging_mutex);
2014-03-27 11:30:54 +00:00
MergeTreeData::DataPartsVector parts;
2016-08-13 01:59:09 +00:00
auto can_merge = [this] (const MergeTreeData::DataPartPtr & left, const MergeTreeData::DataPartPtr & right)
{
return !currently_merging.count(left) && !currently_merging.count(right);
};
2016-08-13 01:59:09 +00:00
bool selected = false;
if (partition.empty())
{
size_t max_parts_size_for_merge = merger.getMaxPartsSizeForMerge();
if (max_parts_size_for_merge > 0)
selected = merger.selectPartsToMerge(parts, merged_name, aggressive, max_parts_size_for_merge, can_merge);
}
else
{
DayNum_t month = MergeTreeData::getMonthFromName(partition);
selected = merger.selectAllPartsToMergeWithinPartition(parts, merged_name, disk_space, can_merge, month, final);
}
2014-03-13 12:48:07 +00:00
if (!selected)
return false;
merging_tagger.emplace(parts, MergeTreeDataMerger::estimateDiskSpaceForMerge(parts), *this);
}
2014-03-13 12:48:07 +00:00
MergeList::EntryPtr merge_entry_ptr = context.getMergeList().insert(database_name, table_name, merged_name, merging_tagger->parts);
/// Logging
Stopwatch stopwatch;
auto new_part = merger.mergePartsToTemporaryPart(
merging_tagger->parts, merged_name, *merge_entry_ptr, aio_threshold, time(0), merging_tagger->reserved_space.get(), deduplicate);
merger.renameMergedTemporaryPart(merging_tagger->parts, new_part, merged_name, nullptr);
2014-03-13 12:48:07 +00:00
if (std::shared_ptr<PartLog> part_log = context.getPartLog())
{
PartLogElement elem;
elem.event_time = time(0);
elem.merged_from.reserve(merging_tagger->parts.size());
for (const auto & part : merging_tagger->parts)
elem.merged_from.push_back(part->name);
elem.event_type = PartLogElement::MERGE_PARTS;
elem.size_in_bytes = new_part->size_in_bytes;
elem.database_name = new_part->storage.getDatabaseName();
elem.table_name = new_part->storage.getTableName();
elem.part_name = new_part->name;
elem.duration_ms = stopwatch.elapsed() / 1000000;
part_log->add(elem);
elem.duration_ms = 0;
elem.event_type = PartLogElement::REMOVE_PART;
elem.merged_from = Strings();
for (const auto & part : merging_tagger->parts)
{
elem.part_name = part->name;
elem.size_in_bytes = part->size_in_bytes;
part_log->add(elem);
}
}
return true;
2014-04-11 13:05:17 +00:00
}
bool StorageMergeTree::mergeTask()
2014-03-13 12:48:07 +00:00
{
if (shutdown_called)
return false;
try
{
size_t aio_threshold = context.getSettings().min_bytes_to_use_direct_io;
2017-04-17 18:05:15 +00:00
return merge(aio_threshold, false /*aggressive*/, {} /*partition*/, false /*final*/, false /*deduplicate*/); ///TODO: read deduplicate option from table config
}
catch (Exception & e)
{
if (e.code() == ErrorCodes::ABORTED)
{
LOG_INFO(log, e.message());
return false;
}
throw;
}
2014-03-13 12:48:07 +00:00
}
void StorageMergeTree::dropColumnFromPartition(const ASTPtr & query, const Field & partition, const Field & column_name, const Settings &)
{
/// Asks to complete merges and does not allow them to start.
/// This protects against "revival" of data for a removed partition after completion of merge.
auto merge_blocker = merger.cancel();
/// Waits for completion of merge and does not start new ones.
auto lock = lockForAlter();
DayNum_t month = MergeTreeData::getMonthDayNum(partition);
MergeTreeData::DataParts parts = data.getDataParts();
std::vector<MergeTreeData::AlterDataPartTransactionPtr> transactions;
AlterCommand alter_command;
alter_command.type = AlterCommand::DROP_COLUMN;
alter_command.column_name = get<String>(column_name);
auto new_columns = data.getColumnsListNonMaterialized();
auto new_materialized_columns = data.materialized_columns;
auto new_alias_columns = data.alias_columns;
auto new_column_defaults = data.column_defaults;
alter_command.apply(new_columns, new_materialized_columns, new_alias_columns, new_column_defaults);
auto columns_for_parts = new_columns;
columns_for_parts.insert(std::end(columns_for_parts),
std::begin(new_materialized_columns), std::end(new_materialized_columns));
for (const auto & part : parts)
{
if (part->month != month)
continue;
if (auto transaction = data.alterDataPart(part, columns_for_parts, data.primary_expr_ast, false))
transactions.push_back(std::move(transaction));
LOG_DEBUG(log, "Removing column " << get<String>(column_name) << " from part " << part->name);
}
if (transactions.empty())
return;
for (auto & transaction : transactions)
transaction->commit();
}
2014-04-11 13:05:17 +00:00
void StorageMergeTree::dropPartition(const ASTPtr & query, const Field & partition, bool detach, const Settings & settings)
2014-10-03 17:57:01 +00:00
{
/// Asks to complete merges and does not allow them to start.
/// This protects against "revival" of data for a removed partition after completion of merge.
auto merge_blocker = merger.cancel();
/// Waits for completion of merge and does not start new ones.
auto lock = lockForAlter();
2014-10-03 18:41:16 +00:00
DayNum_t month = MergeTreeData::getMonthDayNum(partition);
2014-10-03 17:57:01 +00:00
size_t removed_parts = 0;
MergeTreeData::DataParts parts = data.getDataParts();
2014-10-03 17:57:01 +00:00
for (const auto & part : parts)
{
if (part->month != month)
continue;
2014-10-03 17:57:01 +00:00
LOG_DEBUG(log, "Removing part " << part->name);
++removed_parts;
2014-10-03 17:57:01 +00:00
if (detach)
data.renameAndDetachPart(part, "");
else
data.replaceParts({part}, {}, false);
}
2014-10-03 17:57:01 +00:00
LOG_INFO(log, (detach ? "Detached " : "Removed ") << removed_parts << " parts inside " << applyVisitor(FieldVisitorToString(), partition) << ".");
2014-10-03 17:57:01 +00:00
}
2014-10-03 18:41:16 +00:00
void StorageMergeTree::attachPartition(const ASTPtr & query, const Field & field, bool part, const Settings & settings)
2014-10-03 18:41:16 +00:00
{
String partition;
if (part)
partition = field.getType() == Field::Types::UInt64 ? toString(field.get<UInt64>()) : field.safeGet<String>();
else
partition = MergeTreeData::getMonthName(field);
String source_dir = "detached/";
/// Let's make a list of parts to add.
Strings parts;
if (part)
{
parts.push_back(partition);
}
else
{
LOG_DEBUG(log, "Looking for parts for partition " << partition << " in " << source_dir);
ActiveDataPartSet active_parts;
for (Poco::DirectoryIterator it = Poco::DirectoryIterator(full_path + source_dir); it != Poco::DirectoryIterator(); ++it)
{
String name = it.name();
if (!ActiveDataPartSet::isPartDirectory(name))
continue;
if (name.substr(0, partition.size()) != partition)
continue;
LOG_DEBUG(log, "Found part " << name);
active_parts.add(name);
}
LOG_DEBUG(log, active_parts.size() << " of them are active");
parts = active_parts.getParts();
}
for (const auto & source_part_name : parts)
{
String source_path = source_dir + source_part_name;
LOG_DEBUG(log, "Checking data");
MergeTreeData::MutableDataPartPtr part = data.loadPartAndFixMetadata(source_path);
LOG_INFO(log, "Attaching part " << source_part_name << " from " << source_path);
data.renameTempPartAndAdd(part, &increment);
LOG_INFO(log, "Finished attaching part");
}
2017-04-16 15:00:33 +00:00
/// New parts with other data may appear in place of deleted parts.
context.resetCaches();
2014-10-03 18:41:16 +00:00
}
void StorageMergeTree::freezePartition(const Field & partition, const String & with_name, const Settings & settings)
{
/// The prefix can be arbitrary. Not necessarily a month - you can specify only a year.
data.freezePartition(partition.getType() == Field::Types::UInt64
? toString(partition.get<UInt64>())
: partition.safeGet<String>(), with_name);
}
2012-07-17 20:04:39 +00:00
}