KILL MUTATION skeleton [#CLICKHOUSE-3912]

This commit is contained in:
Alexey Zatelepin 2019-01-10 21:19:29 +03:00
parent 1512e17ab8
commit 67be566325
10 changed files with 121 additions and 34 deletions

View File

@ -11,6 +11,7 @@
#include <Columns/ColumnsNumber.h>
#include <DataTypes/DataTypesNumber.h>
#include <DataStreams/OneBlockInputStream.h>
#include <Storages/IStorage.h>
#include <thread>
#include <iostream>
#include <cstddef>
@ -61,12 +62,12 @@ struct QueryDescriptor
using QueryDescriptors = std::vector<QueryDescriptor>;
static void insertResultRow(size_t n, CancellationCode code, const Block & source_processes, const Block & sample_block, MutableColumns & columns)
static void insertResultRow(size_t n, CancellationCode code, const Block & source, const Block & header, MutableColumns & columns)
{
columns[0]->insert(cancellationCodeToStatus(code));
for (size_t col_num = 1, size = columns.size(); col_num < size; ++col_num)
columns[col_num]->insertFrom(*source_processes.getByName(sample_block.getByPosition(col_num).name).column, n);
columns[col_num]->insertFrom(*source.getByName(header.getByPosition(col_num).name).column, n);
}
static QueryDescriptors extractQueriesExceptMeAndCheckAccess(const Block & processes_block, Context & context)
@ -182,38 +183,82 @@ BlockIO InterpreterKillQueryQuery::execute()
return executeDDLQueryOnCluster(query_ptr, context, {"system"});
BlockIO res_io;
Block processes_block = getSelectFromSystemProcessesResult();
if (!processes_block)
return res_io;
ProcessList & process_list = context.getProcessList();
QueryDescriptors queries_to_stop = extractQueriesExceptMeAndCheckAccess(processes_block, context);
auto header = processes_block.cloneEmpty();
header.insert(0, {ColumnString::create(), std::make_shared<DataTypeString>(), "kill_status"});
if (!query.sync || query.test)
if (!query.is_kill_mutation)
{
MutableColumns res_columns = header.cloneEmptyColumns();
Block processes_block = getSelectFromSystemProcessesResult();
if (!processes_block)
return res_io;
for (const auto & query_desc : queries_to_stop)
ProcessList & process_list = context.getProcessList();
QueryDescriptors queries_to_stop = extractQueriesExceptMeAndCheckAccess(processes_block, context);
auto header = processes_block.cloneEmpty();
header.insert(0, {ColumnString::create(), std::make_shared<DataTypeString>(), "kill_status"});
if (!query.sync || query.test)
{
auto code = (query.test) ? CancellationCode::Unknown : process_list.sendCancelToQuery(query_desc.query_id, query_desc.user, true);
MutableColumns res_columns = header.cloneEmptyColumns();
/// Raise exception if this query is immortal, user have to know
/// This could happen only if query generate streams that don't implement IBlockInputStream
if (code == CancellationCode::CancelCannotBeSent)
throw Exception("Can't kill query '" + query_desc.query_id + "' it consits of unkillable stages", ErrorCodes::CANNOT_KILL);
for (const auto & query_desc : queries_to_stop)
{
auto code = (query.test) ? CancellationCode::Unknown : process_list.sendCancelToQuery(query_desc.query_id, query_desc.user, true);
insertResultRow(query_desc.source_num, code, processes_block, header, res_columns);
/// Raise exception if this query is immortal, user have to know
/// This could happen only if query generate streams that don't implement IProfilingBlockInputStream
if (code == CancellationCode::CancelCannotBeSent)
throw Exception("Can't kill query '" + query_desc.query_id + "' it consits of unkillable stages", ErrorCodes::CANNOT_KILL);
insertResultRow(query_desc.source_num, code, processes_block, header, res_columns);
}
res_io.in = std::make_shared<OneBlockInputStream>(header.cloneWithColumns(std::move(res_columns)));
}
else
{
res_io.in = std::make_shared<SyncKillQueryInputStream>(
process_list, std::move(queries_to_stop), std::move(processes_block), header);
}
res_io.in = std::make_shared<OneBlockInputStream>(header.cloneWithColumns(std::move(res_columns)));
}
else
{
res_io.in = std::make_shared<SyncKillQueryInputStream>(
process_list, std::move(queries_to_stop), std::move(processes_block), header);
/// TODO: check permissions
Block mutations_block = getSelectFromSystemMutationsResult();
if (!mutations_block)
return res_io;
const ColumnString & database_col = typeid_cast<const ColumnString &>(*mutations_block.getByName("database").column);
const ColumnString & table_col = typeid_cast<const ColumnString &>(*mutations_block.getByName("table").column);
const ColumnString & mutation_id_col = typeid_cast<const ColumnString &>(*mutations_block.getByName("mutation_id").column);
auto header = mutations_block.cloneEmpty();
header.insert(0, {ColumnString::create(), std::make_shared<DataTypeString>(), "kill_status"});
MutableColumns res_columns = header.cloneEmptyColumns();
for (size_t i = 0; i < mutations_block.rows(); ++i)
{
auto database_name = database_col.getDataAt(i).toString();
auto table_name = table_col.getDataAt(i).toString();
auto mutation_id = mutation_id_col.getDataAt(i).toString();
CancellationCode code = CancellationCode::Unknown;
if (!query.test)
{
auto storage = context.tryGetTable(database_name, table_name);
if (!storage)
code = CancellationCode::NotFound;
else
{
storage->killMutation(mutation_id);
code = CancellationCode::CancelSent;
}
}
insertResultRow(i, code, mutations_block, header, res_columns);
}
res_io.in = std::make_shared<OneBlockInputStream>(header.cloneWithColumns(std::move(res_columns)));
}
return res_io;
@ -226,11 +271,26 @@ Block InterpreterKillQueryQuery::getSelectFromSystemProcessesResult()
if (where_expression)
system_processes_query += " WHERE " + queryToString(where_expression);
BlockIO block_io = executeQuery(system_processes_query, context, true);
Block res = block_io.in->read();
BlockIO system_processes_io = executeQuery(system_processes_query, context, true);
Block res = system_processes_io.in->read();
if (res && block_io.in->read())
throw Exception("Expected one block from input stream", ErrorCodes::LOGICAL_ERROR);
if (res && system_processes_io.in->read())
return res;
}
Block InterpreterKillQueryQuery::getSelectFromSystemMutationsResult()
{
String system_mutations_query = "SELECT database, table, mutation_id FROM system.mutations";
auto & where_expression = static_cast<ASTKillQueryQuery &>(*query_ptr).where_expression;
if (where_expression)
system_mutations_query += " WHERE " + queryToString(where_expression);
BlockIO block_io = executeQuery(system_mutations_query, context, true);
Block res = block_io.in->read();
if (res && block_io.in->read())
throw Exception("Expected one block from input stream", ErrorCodes::LOGICAL_ERROR);
return res;

View File

@ -21,6 +21,7 @@ public:
private:
Block getSelectFromSystemProcessesResult();
Block getSelectFromSystemMutationsResult();
ASTPtr query_ptr;
Context & context;

View File

@ -10,7 +10,8 @@ String ASTKillQueryQuery::getID(char delim) const
void ASTKillQueryQuery::formatQueryImpl(const FormatSettings & settings, FormatState & state, FormatStateStacked frame) const
{
settings.ostr << (settings.hilite ? hilite_keyword : "") << "KILL QUERY";
settings.ostr << (settings.hilite ? hilite_keyword : "") << "KILL "
<< (is_kill_mutation ? "MUTATION" : "QUERY");
formatOnCluster(settings);

View File

@ -8,6 +8,7 @@ namespace DB
class ASTKillQueryQuery : public ASTQueryWithOutput, public ASTQueryWithOnCluster
{
public:
bool is_kill_mutation = false; // if the query is KILL MUTATION.
ASTPtr where_expression; // expression to filter processes from system.processes table
bool sync = false; // SYNC or ASYNC mode
bool test = false; // does it TEST mode? (doesn't cancel queries just checks and shows them)

View File

@ -14,15 +14,24 @@ bool ParserKillQueryQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expect
String cluster_str;
auto query = std::make_shared<ASTKillQueryQuery>();
ParserKeyword p_kill{"KILL"};
ParserKeyword p_query{"QUERY"};
ParserKeyword p_mutation{"MUTATION"};
ParserKeyword p_on{"ON"};
ParserKeyword p_test{"TEST"};
ParserKeyword p_sync{"SYNC"};
ParserKeyword p_async{"ASYNC"};
ParserKeyword p_where{"WHERE"};
ParserKeyword p_kill_query{"KILL QUERY"};
ParserExpression p_where_expression;
if (!p_kill_query.ignore(pos, expected))
if (!p_kill.ignore(pos, expected))
return false;
if (p_query.ignore(pos, expected))
query->is_kill_mutation = false;
else if (p_mutation.ignore(pos, expected))
query->is_kill_mutation = true;
else
return false;
if (p_on.ignore(pos, expected) && !ASTQueryWithOnCluster::parse(pos, cluster_str, expected))

View File

@ -257,6 +257,12 @@ public:
throw Exception("Mutations are not supported by storage " + getName(), ErrorCodes::NOT_IMPLEMENTED);
}
/// Cancel a mutation.
virtual void killMutation(const String & /*mutation_id*/)
{
throw Exception("Mutations are not supported by storage " + getName(), ErrorCodes::NOT_IMPLEMENTED);
}
/** If the table have to do some complicated work on startup,
* that must be postponed after creation of table object
* (like launching some background threads),

View File

@ -343,7 +343,6 @@ void StorageMergeTree::mutate(const MutationCommands & commands, const Context &
background_task_handle->wake();
}
std::vector<MergeTreeMutationStatus> StorageMergeTree::getMutationsStatus() const
{
std::lock_guard lock(currently_merging_mutex);
@ -388,6 +387,11 @@ std::vector<MergeTreeMutationStatus> StorageMergeTree::getMutationsStatus() cons
return result;
}
void StorageMergeTree::killMutation(const String & mutation_id)
{
LOG_TRACE(log, "KILL MUTATION " << mutation_id);
}
void StorageMergeTree::loadMutations()
{

View File

@ -63,8 +63,8 @@ public:
void alterPartition(const ASTPtr & query, const PartitionCommands & commands, const Context & context) override;
void mutate(const MutationCommands & commands, const Context & context) override;
std::vector<MergeTreeMutationStatus> getMutationsStatus() const;
void killMutation(const String & mutation_id) override;
void drop() override;
void truncate(const ASTPtr &, const Context &) override;

View File

@ -4397,6 +4397,11 @@ std::vector<MergeTreeMutationStatus> StorageReplicatedMergeTree::getMutationsSta
return queue.getMutationsStatus();
}
void StorageReplicatedMergeTree::killMutation(const String & mutation_id)
{
LOG_TRACE(log, "KILL MUTATION " << mutation_id);
}
void StorageReplicatedMergeTree::clearOldPartsAndRemoveFromZK()
{

View File

@ -121,8 +121,8 @@ public:
void alterPartition(const ASTPtr & query, const PartitionCommands & commands, const Context & query_context) override;
void mutate(const MutationCommands & commands, const Context & context) override;
std::vector<MergeTreeMutationStatus> getMutationsStatus() const;
void killMutation(const String & mutation_id) override;
/** Removes a replica from ZooKeeper. If there are no other replicas, it deletes the entire table from ZooKeeper.
*/