ClickHouse/dbms/src/Storages/StorageMergeTree.cpp

1096 lines
38 KiB
C++
Raw Normal View History

#include <Databases/IDatabase.h>
2018-12-25 23:13:30 +00:00
#include <Common/escapeForFileName.h>
2017-07-13 20:58:19 +00:00
#include <Common/typeid_cast.h>
2018-12-25 23:13:30 +00:00
#include <Common/FieldVisitors.h>
#include <Common/localBackup.h>
#include <Interpreters/InterpreterAlterQuery.h>
#include <Interpreters/PartLog.h>
2018-12-25 23:13:30 +00:00
#include <Parsers/ASTFunction.h>
#include <Parsers/ASTLiteral.h>
2018-12-25 23:13:30 +00:00
#include <Parsers/queryToString.h>
#include <Storages/MergeTree/MergeTreeData.h>
#include <Storages/MergeTree/ActiveDataPartSet.h>
2018-12-25 23:13:30 +00:00
#include <Storages/AlterCommands.h>
2018-12-25 23:18:07 +00:00
#include <Storages/PartitionCommands.h>
2018-12-25 23:13:30 +00:00
#include <Storages/StorageMergeTree.h>
#include <Storages/MergeTree/MergeTreeBlockOutputStream.h>
#include <Storages/MergeTree/DiskSpaceMonitor.h>
#include <Storages/MergeTree/MergeList.h>
Squashed commit of the following: commit e712f469a55ff34ad34b482b15cc4153b7ad7233 Author: Alexey Milovidov <milovidov@yandex-team.ru> Date: Sat Jan 14 11:59:13 2017 +0300 Less dependencies [#CLICKHOUSE-2] commit 2a002823084e3a79bffcc17d479620a68eb0644b Author: Alexey Milovidov <milovidov@yandex-team.ru> Date: Sat Jan 14 11:58:30 2017 +0300 Less dependencies [#CLICKHOUSE-2] commit 9e06f407c8ee781ed8ddf98bdfcc31846bf2a0fe Author: Alexey Milovidov <milovidov@yandex-team.ru> Date: Sat Jan 14 11:55:14 2017 +0300 Less dependencies [#CLICKHOUSE-2] commit 9581620f1e839f456fa7894aa1f996d5162ac6cd Author: Alexey Milovidov <milovidov@yandex-team.ru> Date: Sat Jan 14 11:54:22 2017 +0300 Less dependencies [#CLICKHOUSE-2] commit 2a8564c68cb6cc3649fafaf401256d43c9a2e777 Author: Alexey Milovidov <milovidov@yandex-team.ru> Date: Sat Jan 14 11:47:34 2017 +0300 Less dependencies [#CLICKHOUSE-2] commit cf60632d78ec656be3304ef4565e859bb6ce80ba Author: Alexey Milovidov <milovidov@yandex-team.ru> Date: Sat Jan 14 11:40:09 2017 +0300 Less dependencies [#CLICKHOUSE-2] commit ee3d1dc6e0c4ca60e3ac1e0c30d4b3ed1e66eca0 Author: Alexey Milovidov <milovidov@yandex-team.ru> Date: Sat Jan 14 11:22:49 2017 +0300 Less dependencies [#CLICKHOUSE-2] commit 65592ef7116a90104fcd524b53ef8b7cf22640f2 Author: Alexey Milovidov <milovidov@yandex-team.ru> Date: Sat Jan 14 11:18:17 2017 +0300 Less dependencies [#CLICKHOUSE-2] commit 37972c257320d3b7e7b294e0fdeffff218647bfd Author: Alexey Milovidov <milovidov@yandex-team.ru> Date: Sat Jan 14 11:17:06 2017 +0300 Less dependencies [#CLICKHOUSE-2] commit dd909d149974ce5bed2456de1261aa5a368fd3ff Author: Alexey Milovidov <milovidov@yandex-team.ru> Date: Sat Jan 14 11:16:28 2017 +0300 Less dependencies [#CLICKHOUSE-2] commit 3cf43266ca7e30adf01212b1a739ba5fe43639fd Author: Alexey Milovidov <milovidov@yandex-team.ru> Date: Sat Jan 14 11:15:42 2017 +0300 Less dependencies [#CLICKHOUSE-2] commit 6731a3df96d1609286e2536b6432916af7743f0f Author: Alexey Milovidov <milovidov@yandex-team.ru> Date: Sat Jan 14 11:13:35 2017 +0300 Less dependencies [#CLICKHOUSE-2] commit 1b5727e0d56415b7add4cb76110105358663602c Author: Alexey Milovidov <milovidov@yandex-team.ru> Date: Sat Jan 14 11:11:18 2017 +0300 Less dependencies [#CLICKHOUSE-2] commit bbcf726a55685b8e72f5b40ba0bf1904bd1c0407 Author: Alexey Milovidov <milovidov@yandex-team.ru> Date: Sat Jan 14 11:09:04 2017 +0300 Less dependencies [#CLICKHOUSE-2] commit c03b477d5e2e65014e8906ecfa2efb67ee295af1 Author: Alexey Milovidov <milovidov@yandex-team.ru> Date: Sat Jan 14 11:06:30 2017 +0300 Less dependencies [#CLICKHOUSE-2] commit 2986e2fb0466bc18d73693dcdded28fccc0dc66b Author: Alexey Milovidov <milovidov@yandex-team.ru> Date: Sat Jan 14 11:05:44 2017 +0300 Less dependencies [#CLICKHOUSE-2] commit 5d6cdef13d2e02bd5c4954983334e9162ab2635b Author: Alexey Milovidov <milovidov@yandex-team.ru> Date: Sat Jan 14 11:04:53 2017 +0300 Less dependencies [#CLICKHOUSE-2] commit f2b819b25ce8b2ccdcb201eefb03e1e6f5aab590 Author: Alexey Milovidov <milovidov@yandex-team.ru> Date: Sat Jan 14 11:01:47 2017 +0300 Less dependencies [#CLICKHOUSE-2]
2017-01-14 09:00:19 +00:00
2015-04-16 06:12:35 +00:00
#include <Poco/DirectoryIterator.h>
Squashed commit of the following: commit e712f469a55ff34ad34b482b15cc4153b7ad7233 Author: Alexey Milovidov <milovidov@yandex-team.ru> Date: Sat Jan 14 11:59:13 2017 +0300 Less dependencies [#CLICKHOUSE-2] commit 2a002823084e3a79bffcc17d479620a68eb0644b Author: Alexey Milovidov <milovidov@yandex-team.ru> Date: Sat Jan 14 11:58:30 2017 +0300 Less dependencies [#CLICKHOUSE-2] commit 9e06f407c8ee781ed8ddf98bdfcc31846bf2a0fe Author: Alexey Milovidov <milovidov@yandex-team.ru> Date: Sat Jan 14 11:55:14 2017 +0300 Less dependencies [#CLICKHOUSE-2] commit 9581620f1e839f456fa7894aa1f996d5162ac6cd Author: Alexey Milovidov <milovidov@yandex-team.ru> Date: Sat Jan 14 11:54:22 2017 +0300 Less dependencies [#CLICKHOUSE-2] commit 2a8564c68cb6cc3649fafaf401256d43c9a2e777 Author: Alexey Milovidov <milovidov@yandex-team.ru> Date: Sat Jan 14 11:47:34 2017 +0300 Less dependencies [#CLICKHOUSE-2] commit cf60632d78ec656be3304ef4565e859bb6ce80ba Author: Alexey Milovidov <milovidov@yandex-team.ru> Date: Sat Jan 14 11:40:09 2017 +0300 Less dependencies [#CLICKHOUSE-2] commit ee3d1dc6e0c4ca60e3ac1e0c30d4b3ed1e66eca0 Author: Alexey Milovidov <milovidov@yandex-team.ru> Date: Sat Jan 14 11:22:49 2017 +0300 Less dependencies [#CLICKHOUSE-2] commit 65592ef7116a90104fcd524b53ef8b7cf22640f2 Author: Alexey Milovidov <milovidov@yandex-team.ru> Date: Sat Jan 14 11:18:17 2017 +0300 Less dependencies [#CLICKHOUSE-2] commit 37972c257320d3b7e7b294e0fdeffff218647bfd Author: Alexey Milovidov <milovidov@yandex-team.ru> Date: Sat Jan 14 11:17:06 2017 +0300 Less dependencies [#CLICKHOUSE-2] commit dd909d149974ce5bed2456de1261aa5a368fd3ff Author: Alexey Milovidov <milovidov@yandex-team.ru> Date: Sat Jan 14 11:16:28 2017 +0300 Less dependencies [#CLICKHOUSE-2] commit 3cf43266ca7e30adf01212b1a739ba5fe43639fd Author: Alexey Milovidov <milovidov@yandex-team.ru> Date: Sat Jan 14 11:15:42 2017 +0300 Less dependencies [#CLICKHOUSE-2] commit 6731a3df96d1609286e2536b6432916af7743f0f Author: Alexey Milovidov <milovidov@yandex-team.ru> Date: Sat Jan 14 11:13:35 2017 +0300 Less dependencies [#CLICKHOUSE-2] commit 1b5727e0d56415b7add4cb76110105358663602c Author: Alexey Milovidov <milovidov@yandex-team.ru> Date: Sat Jan 14 11:11:18 2017 +0300 Less dependencies [#CLICKHOUSE-2] commit bbcf726a55685b8e72f5b40ba0bf1904bd1c0407 Author: Alexey Milovidov <milovidov@yandex-team.ru> Date: Sat Jan 14 11:09:04 2017 +0300 Less dependencies [#CLICKHOUSE-2] commit c03b477d5e2e65014e8906ecfa2efb67ee295af1 Author: Alexey Milovidov <milovidov@yandex-team.ru> Date: Sat Jan 14 11:06:30 2017 +0300 Less dependencies [#CLICKHOUSE-2] commit 2986e2fb0466bc18d73693dcdded28fccc0dc66b Author: Alexey Milovidov <milovidov@yandex-team.ru> Date: Sat Jan 14 11:05:44 2017 +0300 Less dependencies [#CLICKHOUSE-2] commit 5d6cdef13d2e02bd5c4954983334e9162ab2635b Author: Alexey Milovidov <milovidov@yandex-team.ru> Date: Sat Jan 14 11:04:53 2017 +0300 Less dependencies [#CLICKHOUSE-2] commit f2b819b25ce8b2ccdcb201eefb03e1e6f5aab590 Author: Alexey Milovidov <milovidov@yandex-team.ru> Date: Sat Jan 14 11:01:47 2017 +0300 Less dependencies [#CLICKHOUSE-2]
2017-01-14 09:00:19 +00:00
#include <Poco/File.h>
2018-12-25 23:13:30 +00:00
#include <optional>
2012-07-19 20:32:10 +00:00
2012-07-17 20:04:39 +00:00
namespace DB
{
namespace ErrorCodes
{
extern const int ABORTED;
extern const int BAD_ARGUMENTS;
extern const int INCORRECT_DATA;
extern const int INCORRECT_FILE_NAME;
extern const int CANNOT_ASSIGN_OPTIMIZE;
extern const int INCOMPATIBLE_COLUMNS;
}
namespace ActionLocks
{
extern const StorageActionBlockType PartsMerge;
}
StorageMergeTree::StorageMergeTree(
const String & path_,
const String & database_name_,
const String & table_name_,
const ColumnsDescription & columns_,
Data Skipping Indices (#4143) * made index parser * added index parsing * some fixes * added index interface and factory * fixed compilation * ptrs * added indexParts * indextypes * index condition * IndexCondition * added indexes in selectexecutor * fix * changed comment * fix * added granularity * comments * fix * fix * added writing indexes * removed indexpart class * fix * added setSkipIndexes * add rw for MergeTreeIndexes * fixes * upd error * fix * fix * reading * test index * fixed nullptr error * fixed * fix * unique names * asts -> exprlist * minmax index * fix * fixed select * fixed merging * fixed mutation * working minmax * removed test index * fixed style * added indexes to checkDataPart * added tests for minmax index * fixed constructor * fix style * fixed includes * fixed setSkipIndexes * added indexes meta to zookeeper * added parsing * removed throw * alter cmds parse * fix * added alter * fix * alters fix * fix alters * fix "after" * fixed alter * alter fix + test * fixes * upd setSkipIndexes * fixed alter bug with drop all indices * fix metadata editing * new test and repl fix * rm test files * fixed repl alter * fix * fix * indices * MTReadStream * upd test for bug * fix * added useful parsers and ast classes * fix * fix comments * replaced columns * fix * fixed parsing * fixed printing * fix err * basic IndicesDescription * go to IndicesDescr * moved indices * go to indicesDescr * fix test minmax_index* * fixed MT alter * fixed bug with replMT indices storing in zk * rename * refactoring * docs ru * docs ru * docs en * refactor * rename tests * fix docs * refactoring * fix * fix * fix * fixed style * unique idx * unique * fix * better minmax calculation * upd * added getBlock * unique_condition * added termForAST * unique * fixed not * uniqueCondition::mayBeTrueOnGranule * fix * fixed bug with double column * is always true * fix * key set * spaces * test * tests * fix * unique * fix * fix * fixed bug with duplicate column * removed unused data * fix * fixes * __bitSwapLastTwo * fix
2019-02-05 14:50:25 +00:00
const IndicesDescription & indices_,
bool attach,
Context & context_,
const String & date_column_name,
const ASTPtr & partition_by_ast_,
const ASTPtr & order_by_ast_,
const ASTPtr & primary_key_ast_,
const ASTPtr & sample_by_ast_, /// nullptr, if sampling is not supported.
const ASTPtr & ttl_table_ast_,
2019-05-03 02:00:57 +00:00
const MergingParams & merging_params_,
const MergeTreeSettings & settings_,
bool has_force_restore_data_flag)
2019-05-03 02:00:57 +00:00
: MergeTreeData(database_name_, table_name_,
path_ + escapeForFileName(table_name_) + '/',
columns_, indices_,
context_, date_column_name, partition_by_ast_, order_by_ast_, primary_key_ast_,
sample_by_ast_, ttl_table_ast_, merging_params_,
settings_, false, attach),
path(path_),
background_pool(context_.getBackgroundPool()),
reader(*this), writer(*this), merger_mutator(*this, global_context.getBackgroundPool())
2014-03-13 12:48:07 +00:00
{
2019-05-03 02:00:57 +00:00
if (path.empty())
throw Exception("MergeTree require data path", ErrorCodes::INCORRECT_FILE_NAME);
2019-05-03 02:00:57 +00:00
loadDataParts(has_force_restore_data_flag);
2019-05-03 02:00:57 +00:00
if (!attach && !getDataParts().empty())
throw Exception("Data directory for table already containing data parts - probably it was unclean DROP table or manual intervention. You must either clear directory by hand or use ATTACH TABLE instead of CREATE TABLE if you need to use that parts.", ErrorCodes::INCORRECT_DATA);
2019-05-03 02:00:57 +00:00
increment.set(getMaxBlockNumber());
loadMutations();
2014-03-13 12:48:07 +00:00
}
2012-07-17 20:04:39 +00:00
void StorageMergeTree::startup()
{
2019-05-03 02:00:57 +00:00
clearOldPartsFromFilesystem();
/// Temporary directories contain incomplete results of merges (after forced restart)
/// and don't allow to reinitialize them, so delete each of them immediately
2019-05-03 02:00:57 +00:00
clearOldTemporaryDirectories(0);
/// NOTE background task will also do the above cleanups periodically.
time_after_previous_cleanup.restart();
background_task_handle = background_pool.addTask([this] { return backgroundTask(); });
}
2014-11-12 10:37:47 +00:00
2013-09-30 01:29:19 +00:00
void StorageMergeTree::shutdown()
2012-07-30 20:32:36 +00:00
{
if (shutdown_called)
return;
shutdown_called = true;
merger_mutator.actions_blocker.cancelForever();
if (background_task_handle)
background_pool.removeTask(background_task_handle);
2012-07-18 19:44:04 +00:00
}
2014-03-13 12:48:07 +00:00
StorageMergeTree::~StorageMergeTree()
{
shutdown();
2014-03-13 12:48:07 +00:00
}
2012-07-18 19:44:04 +00:00
2012-07-21 05:07:14 +00:00
BlockInputStreams StorageMergeTree::read(
const Names & column_names,
const SelectQueryInfo & query_info,
const Context & context,
QueryProcessingStage::Enum /*processed_stage*/,
const size_t max_block_size,
2017-06-02 15:54:39 +00:00
const unsigned num_streams)
2012-07-21 05:07:14 +00:00
{
2018-09-24 09:53:28 +00:00
return reader.read(column_names, query_info, context, max_block_size, num_streams);
2012-12-06 09:45:09 +00:00
}
BlockOutputStreamPtr StorageMergeTree::write(const ASTPtr & /*query*/, const Context & context)
2013-01-23 11:16:32 +00:00
{
return std::make_shared<MergeTreeBlockOutputStream>(*this, context.getSettingsRef().max_partitions_per_insert_block);
2013-01-23 11:16:32 +00:00
}
2018-08-03 13:17:32 +00:00
void StorageMergeTree::checkTableCanBeDropped() const
2012-08-16 18:17:01 +00:00
{
2019-05-03 02:00:57 +00:00
const_cast<StorageMergeTree &>(*this).recalculateColumnSizes();
global_context.checkTableCanBeDropped(database_name, table_name, getTotalActiveSizeInBytes());
}
void StorageMergeTree::checkPartitionCanBeDropped(const ASTPtr & partition)
{
2019-05-03 02:00:57 +00:00
const_cast<StorageMergeTree &>(*this).recalculateColumnSizes();
2018-08-10 01:41:54 +00:00
2019-05-03 02:00:57 +00:00
const String partition_id = getPartitionIDFromQuery(partition, global_context);
auto parts_to_remove = getDataPartsVectorInPartition(MergeTreeDataPartState::Committed, partition_id);
UInt64 partition_size = 0;
for (const auto & part : parts_to_remove)
{
partition_size += part->bytes_on_disk;
}
global_context.checkPartitionCanBeDropped(database_name, table_name, partition_size);
}
void StorageMergeTree::drop()
{
shutdown();
2019-05-03 02:00:57 +00:00
dropAllData();
2018-04-21 00:35:20 +00:00
}
void StorageMergeTree::truncate(const ASTPtr &, const Context &)
2018-04-21 00:35:20 +00:00
{
2018-06-09 15:48:22 +00:00
{
/// Asks to complete merges and does not allow them to start.
/// This protects against "revival" of data for a removed partition after completion of merge.
auto merge_blocker = merger_mutator.actions_blocker.cancel();
/// NOTE: It's assumed that this method is called under lockForAlter.
2018-06-09 15:48:22 +00:00
2019-05-03 02:00:57 +00:00
auto parts_to_remove = getDataPartsVector();
removePartsFromWorkingSet(parts_to_remove, true);
2018-04-21 00:35:20 +00:00
2018-06-09 15:48:22 +00:00
LOG_INFO(log, "Removed " << parts_to_remove.size() << " parts.");
}
2019-05-03 02:00:57 +00:00
clearOldPartsFromFilesystem();
2013-08-07 13:07:42 +00:00
}
2017-12-01 21:13:25 +00:00
void StorageMergeTree::rename(const String & new_path_to_db, const String & /*new_database_name*/, const String & new_table_name)
{
std::string new_full_path = new_path_to_db + escapeForFileName(new_table_name) + '/';
2014-03-13 19:14:25 +00:00
2019-05-03 02:00:57 +00:00
setPath(new_full_path);
2014-03-13 12:48:07 +00:00
path = new_path_to_db;
table_name = new_table_name;
full_path = new_full_path;
2014-03-13 12:48:07 +00:00
/// NOTE: Logger names are not updated.
}
2018-04-21 00:35:20 +00:00
void StorageMergeTree::alter(
const AlterCommands & params,
const String & current_database_name,
const String & current_table_name,
2019-03-05 10:12:20 +00:00
const Context & context,
2019-03-07 20:52:25 +00:00
TableStructureWriteLockHolder & table_lock_holder)
2013-08-07 13:07:42 +00:00
{
2019-05-02 23:56:42 +00:00
if (!params.isMutable())
2018-11-15 13:12:27 +00:00
{
2019-03-07 20:52:25 +00:00
lockStructureExclusively(table_lock_holder, context.getCurrentQueryId());
2018-11-15 13:12:27 +00:00
auto new_columns = getColumns();
2019-05-02 16:07:23 +00:00
auto new_indices = getIndices();
2018-11-15 13:12:27 +00:00
params.apply(new_columns);
Data Skipping Indices (#4143) * made index parser * added index parsing * some fixes * added index interface and factory * fixed compilation * ptrs * added indexParts * indextypes * index condition * IndexCondition * added indexes in selectexecutor * fix * changed comment * fix * added granularity * comments * fix * fix * added writing indexes * removed indexpart class * fix * added setSkipIndexes * add rw for MergeTreeIndexes * fixes * upd error * fix * fix * reading * test index * fixed nullptr error * fixed * fix * unique names * asts -> exprlist * minmax index * fix * fixed select * fixed merging * fixed mutation * working minmax * removed test index * fixed style * added indexes to checkDataPart * added tests for minmax index * fixed constructor * fix style * fixed includes * fixed setSkipIndexes * added indexes meta to zookeeper * added parsing * removed throw * alter cmds parse * fix * added alter * fix * alters fix * fix alters * fix "after" * fixed alter * alter fix + test * fixes * upd setSkipIndexes * fixed alter bug with drop all indices * fix metadata editing * new test and repl fix * rm test files * fixed repl alter * fix * fix * indices * MTReadStream * upd test for bug * fix * added useful parsers and ast classes * fix * fix comments * replaced columns * fix * fixed parsing * fixed printing * fix err * basic IndicesDescription * go to IndicesDescr * moved indices * go to indicesDescr * fix test minmax_index* * fixed MT alter * fixed bug with replMT indices storing in zk * rename * refactoring * docs ru * docs ru * docs en * refactor * rename tests * fix docs * refactoring * fix * fix * fix * fixed style * unique idx * unique * fix * better minmax calculation * upd * added getBlock * unique_condition * added termForAST * unique * fixed not * uniqueCondition::mayBeTrueOnGranule * fix * fixed bug with double column * is always true * fix * key set * spaces * test * tests * fix * unique * fix * fix * fixed bug with duplicate column * removed unused data * fix * fixes * __bitSwapLastTwo * fix
2019-02-05 14:50:25 +00:00
context.getDatabase(current_database_name)->alterTable(context, current_table_name, new_columns, new_indices, {});
2018-11-15 13:12:27 +00:00
setColumns(std::move(new_columns));
return;
}
/// NOTE: Here, as in ReplicatedMergeTree, you can do ALTER which does not block the writing of data for a long time.
auto merge_blocker = merger_mutator.actions_blocker.cancel();
2019-03-07 20:52:25 +00:00
lockNewDataStructureExclusively(table_lock_holder, context.getCurrentQueryId());
2019-05-03 02:00:57 +00:00
checkAlter(params, context);
2019-05-03 02:00:57 +00:00
auto new_columns = getColumns();
auto new_indices = getIndices();
ASTPtr new_order_by_ast = order_by_ast;
ASTPtr new_primary_key_ast = primary_key_ast;
ASTPtr new_ttl_table_ast = ttl_table_ast;
params.apply(new_columns, new_indices, new_order_by_ast, new_primary_key_ast, new_ttl_table_ast);
2019-05-03 02:00:57 +00:00
auto parts = getDataParts({MergeTreeDataPartState::PreCommitted, MergeTreeDataPartState::Committed, MergeTreeDataPartState::Outdated});
2018-03-13 15:00:28 +00:00
auto columns_for_parts = new_columns.getAllPhysical();
2019-05-03 02:00:57 +00:00
std::vector<AlterDataPartTransactionPtr> transactions;
for (const DataPartPtr & part : parts)
{
2019-05-03 02:00:57 +00:00
if (auto transaction = alterDataPart(part, columns_for_parts, new_indices.indices, false))
transactions.push_back(std::move(transaction));
}
2019-03-07 20:52:25 +00:00
lockStructureExclusively(table_lock_holder, context.getCurrentQueryId());
IDatabase::ASTModifier storage_modifier = [&] (IAST & ast)
2018-02-26 03:37:08 +00:00
{
2019-03-15 17:09:14 +00:00
auto & storage_ast = ast.as<ASTStorage &>();
2019-05-03 02:00:57 +00:00
if (new_order_by_ast.get() != order_by_ast.get())
2019-03-15 17:09:14 +00:00
storage_ast.set(storage_ast.order_by, new_order_by_ast);
2019-05-03 02:00:57 +00:00
if (new_primary_key_ast.get() != primary_key_ast.get())
2019-03-15 17:09:14 +00:00
storage_ast.set(storage_ast.primary_key, new_primary_key_ast);
2019-05-03 02:00:57 +00:00
if (new_ttl_table_ast.get() != ttl_table_ast.get())
storage_ast.set(storage_ast.ttl_table, new_ttl_table_ast);
};
Data Skipping Indices (#4143) * made index parser * added index parsing * some fixes * added index interface and factory * fixed compilation * ptrs * added indexParts * indextypes * index condition * IndexCondition * added indexes in selectexecutor * fix * changed comment * fix * added granularity * comments * fix * fix * added writing indexes * removed indexpart class * fix * added setSkipIndexes * add rw for MergeTreeIndexes * fixes * upd error * fix * fix * reading * test index * fixed nullptr error * fixed * fix * unique names * asts -> exprlist * minmax index * fix * fixed select * fixed merging * fixed mutation * working minmax * removed test index * fixed style * added indexes to checkDataPart * added tests for minmax index * fixed constructor * fix style * fixed includes * fixed setSkipIndexes * added indexes meta to zookeeper * added parsing * removed throw * alter cmds parse * fix * added alter * fix * alters fix * fix alters * fix "after" * fixed alter * alter fix + test * fixes * upd setSkipIndexes * fixed alter bug with drop all indices * fix metadata editing * new test and repl fix * rm test files * fixed repl alter * fix * fix * indices * MTReadStream * upd test for bug * fix * added useful parsers and ast classes * fix * fix comments * replaced columns * fix * fixed parsing * fixed printing * fix err * basic IndicesDescription * go to IndicesDescr * moved indices * go to indicesDescr * fix test minmax_index* * fixed MT alter * fixed bug with replMT indices storing in zk * rename * refactoring * docs ru * docs ru * docs en * refactor * rename tests * fix docs * refactoring * fix * fix * fix * fixed style * unique idx * unique * fix * better minmax calculation * upd * added getBlock * unique_condition * added termForAST * unique * fixed not * uniqueCondition::mayBeTrueOnGranule * fix * fixed bug with double column * is always true * fix * key set * spaces * test * tests * fix * unique * fix * fix * fixed bug with duplicate column * removed unused data * fix * fixes * __bitSwapLastTwo * fix
2019-02-05 14:50:25 +00:00
context.getDatabase(current_database_name)->alterTable(context, current_table_name, new_columns, new_indices, storage_modifier);
/// Reinitialize primary key because primary key column types might have changed.
2019-05-03 02:00:57 +00:00
setPrimaryKeyIndicesAndColumns(new_order_by_ast, new_primary_key_ast, new_columns, new_indices);
2019-05-03 02:00:57 +00:00
setTTLExpressions(new_columns.getColumnTTLs(), new_ttl_table_ast);
for (auto & transaction : transactions)
transaction->commit();
/// Columns sizes could be changed
2019-05-03 02:00:57 +00:00
recalculateColumnSizes();
2014-03-20 13:00:42 +00:00
}
2016-09-02 04:03:40 +00:00
/// While exists, marks parts as 'currently_merging' and reserves free space on filesystem.
struct CurrentlyMergingPartsTagger
{
FutureMergedMutatedPart future_part;
DiskSpaceMonitor::ReservationPtr reserved_space;
bool is_successful = false;
String exception_message;
StorageMergeTree & storage;
public:
CurrentlyMergingPartsTagger(const FutureMergedMutatedPart & future_part_, size_t total_size, StorageMergeTree & storage_)
: future_part(future_part_), storage(storage_)
{
/// Assume mutex is already locked, because this method is called from mergeTask.
reserved_space = DiskSpaceMonitor::reserve(storage.full_path, total_size); /// May throw.
for (const auto & part : future_part.parts)
{
if (storage.currently_merging.count(part))
throw Exception("Tagging alreagy tagged part " + part->name + ". This is a bug.", ErrorCodes::LOGICAL_ERROR);
}
storage.currently_merging.insert(future_part.parts.begin(), future_part.parts.end());
}
~CurrentlyMergingPartsTagger()
{
std::lock_guard lock(storage.currently_merging_mutex);
for (const auto & part : future_part.parts)
{
if (!storage.currently_merging.count(part))
std::terminate();
storage.currently_merging.erase(part);
}
/// Update the information about failed parts in the system.mutations table.
Int64 sources_data_version = future_part.parts.at(0)->info.getDataVersion();
Int64 result_data_version = future_part.part_info.getDataVersion();
auto mutations_begin_it = storage.current_mutations_by_version.end();
auto mutations_end_it = storage.current_mutations_by_version.end();
if (sources_data_version != result_data_version)
{
mutations_begin_it = storage.current_mutations_by_version.upper_bound(sources_data_version);
mutations_end_it = storage.current_mutations_by_version.upper_bound(result_data_version);
}
for (auto it = mutations_begin_it; it != mutations_end_it; ++it)
{
MergeTreeMutationEntry & entry = it->second;
if (is_successful)
{
if (!entry.latest_failed_part.empty() && future_part.part_info.contains(entry.latest_failed_part_info))
{
entry.latest_failed_part.clear();
entry.latest_failed_part_info = MergeTreePartInfo();
entry.latest_fail_time = 0;
entry.latest_fail_reason.clear();
}
}
else
{
entry.latest_failed_part = future_part.parts.at(0)->name;
entry.latest_failed_part_info = future_part.parts.at(0)->info;
entry.latest_fail_time = time(nullptr);
entry.latest_fail_reason = exception_message;
}
}
}
2016-09-02 04:03:40 +00:00
};
void StorageMergeTree::mutate(const MutationCommands & commands, const Context &)
{
2019-05-03 02:00:57 +00:00
MergeTreeMutationEntry entry(commands, full_path, insert_increment.get());
2018-09-07 15:54:34 +00:00
String file_name;
{
std::lock_guard lock(currently_merging_mutex);
Int64 version = increment.get();
entry.commit(version);
2018-09-07 15:54:34 +00:00
file_name = entry.file_name;
auto insertion = current_mutations_by_id.emplace(file_name, std::move(entry));
current_mutations_by_version.emplace(version, insertion.first->second);
}
2018-09-07 15:54:34 +00:00
LOG_INFO(log, "Added mutation: " << file_name);
background_task_handle->wake();
}
std::vector<MergeTreeMutationStatus> StorageMergeTree::getMutationsStatus() const
{
std::lock_guard lock(currently_merging_mutex);
std::vector<Int64> part_data_versions;
2019-05-03 02:00:57 +00:00
auto data_parts = getDataPartsVector();
part_data_versions.reserve(data_parts.size());
for (const auto & part : data_parts)
part_data_versions.push_back(part->info.getDataVersion());
2018-07-23 18:03:09 +00:00
std::sort(part_data_versions.begin(), part_data_versions.end());
std::vector<MergeTreeMutationStatus> result;
for (const auto & kv : current_mutations_by_version)
{
Int64 mutation_version = kv.first;
const MergeTreeMutationEntry & entry = kv.second;
auto versions_it = std::lower_bound(
part_data_versions.begin(), part_data_versions.end(), mutation_version);
Int64 parts_to_do = versions_it - part_data_versions.begin();
std::map<String, Int64> block_numbers_map({{"", entry.block_number}});
for (const MutationCommand & command : entry.commands)
{
std::stringstream ss;
formatAST(*command.ast, ss, false, true);
result.push_back(MergeTreeMutationStatus
{
entry.file_name,
ss.str(),
entry.create_time,
block_numbers_map,
parts_to_do,
(parts_to_do == 0),
entry.latest_failed_part,
entry.latest_fail_time,
entry.latest_fail_reason,
});
}
}
return result;
}
CancellationCode StorageMergeTree::killMutation(const String & mutation_id)
{
LOG_TRACE(log, "Killing mutation " << mutation_id);
std::optional<MergeTreeMutationEntry> to_kill;
{
std::lock_guard lock(currently_merging_mutex);
auto it = current_mutations_by_id.find(mutation_id);
if (it != current_mutations_by_id.end())
{
to_kill.emplace(std::move(it->second));
current_mutations_by_id.erase(it);
current_mutations_by_version.erase(to_kill->block_number);
}
}
if (!to_kill)
return CancellationCode::NotFound;
global_context.getMergeList().cancelPartMutations({}, to_kill->block_number);
to_kill->removeFile();
LOG_TRACE(log, "Cancelled part mutations and removed mutation file " << mutation_id);
/// Maybe there is another mutation that was blocked by the killed one. Try to execute it immediately.
background_task_handle->wake();
return CancellationCode::CancelSent;
}
void StorageMergeTree::loadMutations()
{
Poco::DirectoryIterator end;
for (auto it = Poco::DirectoryIterator(full_path); it != end; ++it)
{
if (startsWith(it.name(), "mutation_"))
{
MergeTreeMutationEntry entry(full_path, it.name());
Int64 block_number = entry.block_number;
auto insertion = current_mutations_by_id.emplace(it.name(), std::move(entry));
current_mutations_by_version.emplace(block_number, insertion.first->second);
}
else if (startsWith(it.name(), "tmp_mutation_"))
{
it->remove();
}
}
if (!current_mutations_by_version.empty())
increment.value = std::max(Int64(increment.value.load()), current_mutations_by_version.rbegin()->first);
}
bool StorageMergeTree::merge(
bool aggressive,
const String & partition_id,
bool final,
bool deduplicate,
String * out_disable_reason)
2014-03-13 12:48:07 +00:00
{
2019-03-07 20:52:25 +00:00
auto table_lock_holder = lockStructureForShare(true, RWLockImpl::NO_QUERY);
2014-03-13 12:48:07 +00:00
FutureMergedMutatedPart future_part;
/// You must call destructor with unlocked `currently_merging_mutex`.
std::optional<CurrentlyMergingPartsTagger> merging_tagger;
2014-03-13 12:48:07 +00:00
{
2019-01-02 06:44:36 +00:00
std::lock_guard lock(currently_merging_mutex);
2014-03-27 11:30:54 +00:00
2019-05-03 02:00:57 +00:00
auto can_merge = [this, &lock] (const DataPartPtr & left, const DataPartPtr & right, String *)
{
return !currently_merging.count(left) && !currently_merging.count(right)
&& getCurrentMutationVersion(left, lock) == getCurrentMutationVersion(right, lock);
};
2016-08-13 01:59:09 +00:00
bool selected = false;
if (partition_id.empty())
{
2018-12-18 17:41:03 +00:00
UInt64 max_source_parts_size = merger_mutator.getMaxSourcePartsSize();
if (max_source_parts_size > 0)
selected = merger_mutator.selectPartsToMerge(future_part, aggressive, max_source_parts_size, can_merge, out_disable_reason);
else if (out_disable_reason)
*out_disable_reason = "Current value of max_source_parts_size is zero";
}
else
{
2018-07-08 05:26:51 +00:00
UInt64 disk_space = DiskSpaceMonitor::getUnreservedFreeSpace(full_path);
selected = merger_mutator.selectAllPartsToMergeWithinPartition(future_part, disk_space, can_merge, partition_id, final, out_disable_reason);
}
2014-03-13 12:48:07 +00:00
if (!selected)
return false;
merging_tagger.emplace(future_part, MergeTreeDataMergerMutator::estimateNeededDiskSpace(future_part.parts), *this);
}
2014-03-13 12:48:07 +00:00
MergeList::EntryPtr merge_entry = global_context.getMergeList().insert(database_name, table_name, future_part);
/// Logging
Stopwatch stopwatch;
2019-05-03 02:00:57 +00:00
MutableDataPartPtr new_part;
auto write_part_log = [&] (const ExecutionStatus & execution_status)
{
try
{
auto part_log = global_context.getPartLog(database_name);
if (!part_log)
return;
PartLogElement part_log_elem;
2014-03-13 12:48:07 +00:00
part_log_elem.event_type = PartLogElement::MERGE_PARTS;
part_log_elem.event_time = time(nullptr);
part_log_elem.duration_ms = stopwatch.elapsed() / 1000000;
part_log_elem.database_name = database_name;
part_log_elem.table_name = table_name;
2019-01-31 17:30:56 +00:00
part_log_elem.partition_id = future_part.part_info.partition_id;
part_log_elem.part_name = future_part.name;
if (new_part)
part_log_elem.bytes_compressed_on_disk = new_part->bytes_on_disk;
part_log_elem.source_part_names.reserve(future_part.parts.size());
for (const auto & source_part : future_part.parts)
part_log_elem.source_part_names.push_back(source_part->name);
part_log_elem.rows_read = (*merge_entry)->rows_read;
part_log_elem.bytes_read_uncompressed = (*merge_entry)->bytes_read_uncompressed;
part_log_elem.rows = (*merge_entry)->rows_written;
part_log_elem.bytes_uncompressed = (*merge_entry)->bytes_written_uncompressed;
part_log_elem.error = static_cast<UInt16>(execution_status.code);
part_log_elem.exception = execution_status.message;
part_log->add(part_log_elem);
}
catch (...)
{
tryLogCurrentException(log, __PRETTY_FUNCTION__);
}
};
try
{
new_part = merger_mutator.mergePartsToTemporaryPart(
future_part, *merge_entry, time(nullptr),
merging_tagger->reserved_space.get(), deduplicate);
merger_mutator.renameMergedTemporaryPart(new_part, future_part.parts, nullptr);
2019-05-03 02:00:57 +00:00
removeEmptyColumnsFromPart(new_part);
merging_tagger->is_successful = true;
write_part_log({});
}
catch (...)
{
merging_tagger->exception_message = getCurrentExceptionMessage(false);
write_part_log(ExecutionStatus::fromCurrentException());
throw;
}
return true;
2014-04-11 13:05:17 +00:00
}
2018-04-21 00:35:20 +00:00
bool StorageMergeTree::tryMutatePart()
{
2019-03-07 20:52:25 +00:00
auto table_lock_holder = lockStructureForShare(true, RWLockImpl::NO_QUERY);
FutureMergedMutatedPart future_part;
MutationCommands commands;
/// You must call destructor with unlocked `currently_merging_mutex`.
std::optional<CurrentlyMergingPartsTagger> tagger;
{
auto disk_space = DiskSpaceMonitor::getUnreservedFreeSpace(full_path);
2019-01-02 06:44:36 +00:00
std::lock_guard lock(currently_merging_mutex);
if (current_mutations_by_version.empty())
return false;
auto mutations_end_it = current_mutations_by_version.end();
2019-05-03 02:00:57 +00:00
for (const auto & part : getDataPartsVector())
{
if (currently_merging.count(part))
continue;
auto mutations_begin_it = current_mutations_by_version.upper_bound(part->info.getDataVersion());
if (mutations_begin_it == mutations_end_it)
continue;
auto estimated_needed_space = MergeTreeDataMergerMutator::estimateNeededDiskSpace({part});
if (estimated_needed_space > disk_space)
continue;
for (auto it = mutations_begin_it; it != mutations_end_it; ++it)
commands.insert(commands.end(), it->second.commands.begin(), it->second.commands.end());
auto new_part_info = part->info;
new_part_info.mutation = current_mutations_by_version.rbegin()->first;
future_part.parts.push_back(part);
future_part.part_info = new_part_info;
future_part.name = part->getNewName(new_part_info);
tagger.emplace(future_part, estimated_needed_space, *this);
break;
}
}
if (!tagger)
return false;
MergeList::EntryPtr merge_entry = global_context.getMergeList().insert(database_name, table_name, future_part);
Stopwatch stopwatch;
2019-05-03 02:00:57 +00:00
MutableDataPartPtr new_part;
auto write_part_log = [&] (const ExecutionStatus & execution_status)
{
try
{
auto part_log = global_context.getPartLog(database_name);
if (!part_log)
return;
PartLogElement part_log_elem;
part_log_elem.event_type = PartLogElement::MUTATE_PART;
part_log_elem.error = static_cast<UInt16>(execution_status.code);
part_log_elem.exception = execution_status.message;
part_log_elem.event_time = time(nullptr);
part_log_elem.duration_ms = stopwatch.elapsed() / 1000000;
part_log_elem.database_name = database_name;
part_log_elem.table_name = table_name;
2019-01-31 17:30:56 +00:00
part_log_elem.partition_id = future_part.part_info.partition_id;
part_log_elem.part_name = future_part.name;
part_log_elem.rows_read = (*merge_entry)->rows_read;
part_log_elem.bytes_read_uncompressed = (*merge_entry)->bytes_read_uncompressed;
part_log_elem.rows = (*merge_entry)->rows_written;
part_log_elem.bytes_uncompressed = (*merge_entry)->bytes_written_uncompressed;
if (new_part)
part_log_elem.bytes_compressed_on_disk = new_part->bytes_on_disk;
part_log_elem.source_part_names.reserve(future_part.parts.size());
for (const auto & source_part : future_part.parts)
part_log_elem.source_part_names.push_back(source_part->name);
part_log->add(part_log_elem);
}
catch (...)
{
tryLogCurrentException(log, __PRETTY_FUNCTION__);
}
};
try
{
new_part = merger_mutator.mutatePartToTemporaryPart(future_part, commands, *merge_entry, global_context);
2019-05-03 02:00:57 +00:00
renameTempPartAndReplace(new_part);
tagger->is_successful = true;
write_part_log({});
}
catch (...)
{
tagger->exception_message = getCurrentExceptionMessage(false);
write_part_log(ExecutionStatus::fromCurrentException());
throw;
}
return true;
}
2018-12-26 17:03:29 +00:00
BackgroundProcessingPoolTaskResult StorageMergeTree::backgroundTask()
2014-03-13 12:48:07 +00:00
{
if (shutdown_called)
2018-12-26 17:03:29 +00:00
return BackgroundProcessingPoolTaskResult::ERROR;
if (merger_mutator.actions_blocker.isCancelled())
2018-12-26 17:03:29 +00:00
return BackgroundProcessingPoolTaskResult::ERROR;
try
{
/// Clear old parts. It is unnecessary to do it more than once a second.
if (auto lock = time_after_previous_cleanup.compareAndRestartDeferred(1))
{
2019-05-03 02:00:57 +00:00
clearOldPartsFromFilesystem();
{
/// TODO: Implement tryLockStructureForShare.
auto lock_structure = lockStructureForShare(false, "");
2019-05-03 02:00:57 +00:00
clearOldTemporaryDirectories();
}
clearOldMutations();
}
///TODO: read deduplicate option from table config
2018-09-12 19:57:14 +00:00
if (merge(false /*aggressive*/, {} /*partition_id*/, false /*final*/, false /*deduplicate*/))
2018-12-26 17:03:29 +00:00
return BackgroundProcessingPoolTaskResult::SUCCESS;
2018-12-26 17:03:29 +00:00
if (tryMutatePart())
return BackgroundProcessingPoolTaskResult::SUCCESS;
else
return BackgroundProcessingPoolTaskResult::ERROR;
}
catch (Exception & e)
{
if (e.code() == ErrorCodes::ABORTED)
{
LOG_INFO(log, e.message());
2018-12-26 17:03:29 +00:00
return BackgroundProcessingPoolTaskResult::ERROR;
}
throw;
}
2014-03-13 12:48:07 +00:00
}
Int64 StorageMergeTree::getCurrentMutationVersion(
2019-05-03 02:00:57 +00:00
const DataPartPtr & part,
std::lock_guard<std::mutex> & /* currently_merging_mutex_lock */) const
{
auto it = current_mutations_by_version.upper_bound(part->info.getDataVersion());
if (it == current_mutations_by_version.begin())
return 0;
--it;
return it->first;
}
void StorageMergeTree::clearOldMutations()
{
2019-05-03 02:00:57 +00:00
if (!settings.finished_mutations_to_keep)
return;
std::vector<MergeTreeMutationEntry> mutations_to_delete;
{
std::lock_guard lock(currently_merging_mutex);
2019-05-03 02:00:57 +00:00
if (current_mutations_by_version.size() <= settings.finished_mutations_to_keep)
return;
auto begin_it = current_mutations_by_version.begin();
2019-05-03 02:00:57 +00:00
std::optional<Int64> min_version = getMinPartDataVersion();
auto end_it = current_mutations_by_version.end();
if (min_version)
end_it = current_mutations_by_version.upper_bound(*min_version);
size_t done_count = std::distance(begin_it, end_it);
2019-05-03 02:00:57 +00:00
if (done_count <= settings.finished_mutations_to_keep)
return;
2019-05-03 02:00:57 +00:00
size_t to_delete_count = done_count - settings.finished_mutations_to_keep;
auto it = begin_it;
for (size_t i = 0; i < to_delete_count; ++i)
{
mutations_to_delete.push_back(std::move(it->second));
current_mutations_by_id.erase(mutations_to_delete.back().file_name);
it = current_mutations_by_version.erase(it);
}
}
for (auto & mutation : mutations_to_delete)
{
LOG_TRACE(log, "Removing mutation: " << mutation.file_name);
mutation.removeFile();
}
}
2019-05-09 14:25:18 +00:00
void StorageMergeTree::clearColumnOrIndexInPartition(const ASTPtr & partition, const AlterCommand & alter_command, const Context & context)
{
/// Asks to complete merges and does not allow them to start.
/// This protects against "revival" of data for a removed partition after completion of merge.
auto merge_blocker = merger_mutator.actions_blocker.cancel();
/// We don't change table structure, only data in some parts, parts are locked inside alterDataPart() function
auto lock_read_structure = lockStructureForShare(false, context.getCurrentQueryId());
2019-05-03 02:00:57 +00:00
String partition_id = getPartitionIDFromQuery(partition, context);
auto parts = getDataPartsVectorInPartition(MergeTreeDataPartState::Committed, partition_id);
2019-05-03 02:00:57 +00:00
std::vector<AlterDataPartTransactionPtr> transactions;
auto new_columns = getColumns();
2019-05-02 16:07:23 +00:00
auto new_indices = getIndices();
ASTPtr ignored_order_by_ast;
ASTPtr ignored_primary_key_ast;
ASTPtr ignored_ttl_table_ast;
alter_command.apply(new_columns, new_indices, ignored_order_by_ast, ignored_primary_key_ast, ignored_ttl_table_ast);
2018-03-13 15:00:28 +00:00
auto columns_for_parts = new_columns.getAllPhysical();
for (const auto & part : parts)
{
if (part->info.partition_id != partition_id)
throw Exception("Unexpected partition ID " + part->info.partition_id + ". This is a bug.", ErrorCodes::LOGICAL_ERROR);
2019-05-03 02:00:57 +00:00
if (auto transaction = alterDataPart(part, columns_for_parts, new_indices.indices, false))
transactions.push_back(std::move(transaction));
2019-05-09 14:25:18 +00:00
if (alter_command.type == AlterCommand::DROP_COLUMN)
LOG_DEBUG(log, "Removing column " << alter_command.column_name << " from part " << part->name);
else if (alter_command.type == AlterCommand::DROP_INDEX)
LOG_DEBUG(log, "Removing index " << alter_command.index_name << " from part " << part->name);
}
if (transactions.empty())
return;
for (auto & transaction : transactions)
transaction->commit();
/// Recalculate columns size (not only for the modified column)
2019-05-03 02:00:57 +00:00
recalculateColumnSizes();
}
2014-04-11 13:05:17 +00:00
bool StorageMergeTree::optimize(
2017-12-01 21:13:25 +00:00
const ASTPtr & /*query*/, const ASTPtr & partition, bool final, bool deduplicate, const Context & context)
{
String disable_reason;
if (!partition && final)
{
2019-05-03 02:00:57 +00:00
DataPartsVector data_parts = getDataPartsVector();
std::unordered_set<String> partition_ids;
2019-05-03 02:00:57 +00:00
for (const DataPartPtr & part : data_parts)
2018-07-05 18:45:18 +00:00
partition_ids.emplace(part->info.partition_id);
for (const String & partition_id : partition_ids)
{
2018-09-12 19:57:14 +00:00
if (!merge(true, partition_id, true, deduplicate, &disable_reason))
{
if (context.getSettingsRef().optimize_throw_if_noop)
throw Exception(disable_reason.empty() ? "Can't OPTIMIZE by some reason" : disable_reason, ErrorCodes::CANNOT_ASSIGN_OPTIMIZE);
return false;
}
}
}
else
{
String partition_id;
if (partition)
2019-05-03 02:00:57 +00:00
partition_id = getPartitionIDFromQuery(partition, context);
2018-09-12 19:57:14 +00:00
if (!merge(true, partition_id, final, deduplicate, &disable_reason))
{
if (context.getSettingsRef().optimize_throw_if_noop)
throw Exception(disable_reason.empty() ? "Can't OPTIMIZE by some reason" : disable_reason, ErrorCodes::CANNOT_ASSIGN_OPTIMIZE);
return false;
}
}
return true;
}
void StorageMergeTree::alterPartition(const ASTPtr & query, const PartitionCommands & commands, const Context & context)
{
for (const PartitionCommand & command : commands)
{
switch (command.type)
{
case PartitionCommand::DROP_PARTITION:
checkPartitionCanBeDropped(command.partition);
dropPartition(command.partition, command.detach, context);
break;
case PartitionCommand::ATTACH_PARTITION:
attachPartition(command.partition, command.part, context);
break;
case PartitionCommand::REPLACE_PARTITION:
{
checkPartitionCanBeDropped(command.partition);
String from_database = command.from_database.empty() ? context.getCurrentDatabase() : command.from_database;
auto from_storage = context.getTable(from_database, command.from_table);
replacePartitionFrom(from_storage, command.partition, command.replace, context);
}
break;
case PartitionCommand::FREEZE_PARTITION:
{
auto lock = lockStructureForShare(false, context.getCurrentQueryId());
2019-05-03 02:00:57 +00:00
freezePartition(command.partition, command.with_name, context);
}
break;
case PartitionCommand::CLEAR_COLUMN:
2019-05-09 14:25:18 +00:00
{
AlterCommand alter_command;
alter_command.type = AlterCommand::DROP_COLUMN;
alter_command.column_name = get<String>(command.column_name);
clearColumnOrIndexInPartition(command.partition, alter_command, context);
}
break;
case PartitionCommand::CLEAR_INDEX:
{
AlterCommand alter_command;
alter_command.type = AlterCommand::DROP_COLUMN;
alter_command.column_name = get<String>(command.index_name);
clearColumnOrIndexInPartition(command.partition, alter_command, context);
}
break;
case PartitionCommand::FREEZE_ALL_PARTITIONS:
{
auto lock = lockStructureForShare(false, context.getCurrentQueryId());
2019-05-03 02:00:57 +00:00
freezeAll(command.with_name, context);
}
break;
default:
IStorage::alterPartition(query, commands, context); // should throw an exception.
}
}
}
void StorageMergeTree::dropPartition(const ASTPtr & partition, bool detach, const Context & context)
2014-10-03 17:57:01 +00:00
{
{
/// Asks to complete merges and does not allow them to start.
/// This protects against "revival" of data for a removed partition after completion of merge.
auto merge_blocker = merger_mutator.actions_blocker.cancel();
/// Waits for completion of merge and does not start new ones.
2019-03-05 10:12:20 +00:00
auto lock = lockExclusively(context.getCurrentQueryId());
2019-05-03 02:00:57 +00:00
String partition_id = getPartitionIDFromQuery(partition, context);
2014-10-03 17:57:01 +00:00
/// TODO: should we include PreComitted parts like in Replicated case?
2019-05-03 02:00:57 +00:00
auto parts_to_remove = getDataPartsVectorInPartition(MergeTreeDataPartState::Committed, partition_id);
removePartsFromWorkingSet(parts_to_remove, true);
2014-10-03 17:57:01 +00:00
if (detach)
{
/// If DETACH clone parts to detached/ directory
for (const auto & part : parts_to_remove)
{
LOG_INFO(log, "Detaching " << part->relative_path);
part->makeCloneInDetached("");
}
}
LOG_INFO(log, (detach ? "Detached " : "Removed ") << parts_to_remove.size() << " parts inside partition ID " << partition_id << ".");
}
2014-10-03 17:57:01 +00:00
2019-05-03 02:00:57 +00:00
clearOldPartsFromFilesystem();
2014-10-03 17:57:01 +00:00
}
2014-10-03 18:41:16 +00:00
void StorageMergeTree::attachPartition(const ASTPtr & partition, bool attach_part, const Context & context)
2014-10-03 18:41:16 +00:00
{
// TODO: should get some locks to prevent race with 'alter … modify column'
String partition_id;
if (attach_part)
2019-03-15 17:09:14 +00:00
partition_id = partition->as<ASTLiteral &>().value.safeGet<String>();
else
2019-05-03 02:00:57 +00:00
partition_id = getPartitionIDFromQuery(partition, context);
String source_dir = "detached/";
/// Let's make a list of parts to add.
Strings parts;
if (attach_part)
{
parts.push_back(partition_id);
}
else
{
LOG_DEBUG(log, "Looking for parts for partition " << partition_id << " in " << source_dir);
2019-05-03 02:00:57 +00:00
ActiveDataPartSet active_parts(format_version);
for (Poco::DirectoryIterator it = Poco::DirectoryIterator(full_path + source_dir); it != Poco::DirectoryIterator(); ++it)
{
const String & name = it.name();
MergeTreePartInfo part_info;
2019-05-03 02:00:57 +00:00
if (!MergeTreePartInfo::tryParsePartName(name, &part_info, format_version)
|| part_info.partition_id != partition_id)
{
continue;
}
LOG_DEBUG(log, "Found part " << name);
active_parts.add(name);
}
LOG_DEBUG(log, active_parts.size() << " of them are active");
parts = active_parts.getParts();
}
for (const auto & source_part_name : parts)
{
String source_path = source_dir + source_part_name;
LOG_DEBUG(log, "Checking data");
2019-05-03 02:00:57 +00:00
MutableDataPartPtr part = loadPartAndFixMetadata(source_path);
LOG_INFO(log, "Attaching part " << source_part_name << " from " << source_path);
2019-05-03 02:00:57 +00:00
renameTempPartAndAdd(part, &increment);
LOG_INFO(log, "Finished attaching part");
}
2017-04-16 15:00:33 +00:00
/// New parts with other data may appear in place of deleted parts.
context.dropCaches();
2014-10-03 18:41:16 +00:00
}
void StorageMergeTree::replacePartitionFrom(const StoragePtr & source_table, const ASTPtr & partition, bool replace, const Context & context)
{
auto lock1 = lockStructureForShare(false, context.getCurrentQueryId());
auto lock2 = source_table->lockStructureForShare(false, context.getCurrentQueryId());
Stopwatch watch;
2019-05-03 02:00:57 +00:00
MergeTreeData & src_data = checkStructureAndGetMergeTreeData(source_table);
String partition_id = getPartitionIDFromQuery(partition, context);
2019-05-03 02:00:57 +00:00
DataPartsVector src_parts = src_data.getDataPartsVectorInPartition(MergeTreeDataPartState::Committed, partition_id);
MutableDataPartsVector dst_parts;
static const String TMP_PREFIX = "tmp_replace_from_";
2019-05-03 02:00:57 +00:00
for (const DataPartPtr & src_part : src_parts)
{
/// This will generate unique name in scope of current server process.
2019-05-03 02:00:57 +00:00
Int64 temp_index = insert_increment.get();
MergeTreePartInfo dst_part_info(partition_id, temp_index, temp_index, src_part->info.level);
std::shared_lock<std::shared_mutex> part_lock(src_part->columns_lock);
2019-05-03 02:00:57 +00:00
dst_parts.emplace_back(cloneAndLoadDataPart(src_part, TMP_PREFIX, dst_part_info));
}
/// ATTACH empty part set
if (!replace && dst_parts.empty())
return;
MergeTreePartInfo drop_range;
if (replace)
{
drop_range.partition_id = partition_id;
drop_range.min_block = 0;
drop_range.max_block = increment.get(); // there will be a "hole" in block numbers
drop_range.level = std::numeric_limits<decltype(drop_range.level)>::max();
}
/// Atomically add new parts and remove old ones
try
{
{
/// Here we use the transaction just like RAII since rare errors in renameTempPartAndReplace() are possible
/// and we should be able to rollback already added (Precomitted) parts
2019-05-03 02:00:57 +00:00
Transaction transaction(*this);
2019-05-03 02:00:57 +00:00
auto data_parts_lock = lockParts();
/// Populate transaction
2019-05-03 02:00:57 +00:00
for (MutableDataPartPtr & part : dst_parts)
renameTempPartAndReplace(part, &increment, &transaction, data_parts_lock);
transaction.commit(&data_parts_lock);
/// If it is REPLACE (not ATTACH), remove all parts which max_block_number less then min_block_number of the first new block
if (replace)
2019-05-03 02:00:57 +00:00
removePartsInRangeFromWorkingSet(drop_range, true, false, data_parts_lock);
}
PartLog::addNewParts(global_context, dst_parts, watch.elapsed());
}
catch (...)
{
PartLog::addNewParts(global_context, dst_parts, watch.elapsed(), ExecutionStatus::fromCurrentException());
throw;
}
}
ActionLock StorageMergeTree::getActionLock(StorageActionBlockType action_type)
{
if (action_type == ActionLocks::PartsMerge)
return merger_mutator.actions_blocker.cancel();
return {};
}
2012-07-17 20:04:39 +00:00
}