Add system.detached_parts table.

https://github.com/yandex/ClickHouse/issues/5164
This commit is contained in:
Alexander Kuzmenkov 2019-05-20 19:24:36 +03:00
parent a08fee57b7
commit 43655a8db7
18 changed files with 372 additions and 199 deletions

View File

@ -2568,6 +2568,39 @@ MergeTreeData::DataPartsVector MergeTreeData::getAllDataPartsVector(MergeTreeDat
return res;
}
std::vector<DetachedPartInfo>
MergeTreeData::getDetachedParts() const
{
std::vector<DetachedPartInfo> res;
for (Poco::DirectoryIterator it(full_path + "detached");
it != Poco::DirectoryIterator(); ++it)
{
auto dir_name = it.name();
res.emplace_back();
auto & part = res.back();
/// First, try to parse as <part_name>.
if (MergeTreePartInfo::tryParsePartName(dir_name, &part, format_version))
continue;
/// Next, as <prefix>_<partname>. Use entire name as prefix if it fails.
part.prefix = dir_name;
const auto first_separator = dir_name.find_first_of('_');
if (first_separator == String::npos)
continue;
const auto part_name = dir_name.substr(first_separator + 1,
dir_name.size() - first_separator - 1);
if (!MergeTreePartInfo::tryParsePartName(part_name, &part, format_version))
continue;
part.prefix = dir_name.substr(0, first_separator);
}
return res;
}
MergeTreeData::DataParts MergeTreeData::getDataParts(const DataPartStates & affordable_states) const
{
DataParts res;

View File

@ -413,6 +413,9 @@ public:
/// Returns absolutely all parts (and snapshot of their states)
DataPartsVector getAllDataPartsVector(DataPartStateVector * out_states = nullptr) const;
/// Returns all detached parts
std::vector<DetachedPartInfo> getDetachedParts() const;
/// Returns Committed parts
DataParts getDataParts() const;
DataPartsVector getDataPartsVector() const;

View File

@ -437,25 +437,26 @@ void MergeTreeDataPart::renameTo(const String & new_relative_path, bool remove_n
String MergeTreeDataPart::getRelativePathForDetachedPart(const String & prefix) const
{
/// Do not allow underscores in the prefix because they are used as separators.
assert(prefix.find_first_of('_') == String::npos);
String res;
unsigned try_no = 0;
auto dst_name = [&, this] { return "detached/" + prefix + name + (try_no ? "_try" + DB::toString(try_no) : ""); };
/** If you need to detach a part, and directory into which we want to rename it already exists,
* we will rename to the directory with the name to which the suffix is added in the form of "_tryN".
* This is done only in the case of `to_detached`, because it is assumed that in this case the exact name does not matter.
* No more than 10 attempts are made so that there are not too many junk directories left.
*/
while (try_no < 10)
for (int try_no = 0; try_no < 10; try_no++)
{
res = dst_name();
res = "detached/" + (prefix.empty() ? "" : prefix + "_")
+ name + (try_no ? "_try" + DB::toString(try_no) : "");
if (!Poco::File(storage.full_path + res).exists())
return res;
LOG_WARNING(storage.log, "Directory " << dst_name() << " (to detach to) is already exist."
LOG_WARNING(storage.log, "Directory " << res << " (to detach to) already exists."
" Will detach to directory with '_tryN' suffix.");
++try_no;
}
return res;

View File

@ -52,6 +52,12 @@ bool MergeTreePartInfo::tryParsePartName(const String & dir_name, MergeTreePartI
}
}
/// Sanity check
if (partition_id.empty())
{
return false;
}
Int64 min_block_num = 0;
Int64 max_block_num = 0;
UInt32 level = 0;
@ -66,6 +72,12 @@ bool MergeTreePartInfo::tryParsePartName(const String & dir_name, MergeTreePartI
return false;
}
/// Sanity check
if (min_block_num > max_block_num)
{
return false;
}
if (!in.eof())
{
if (!checkChar('_', in)

View File

@ -88,4 +88,11 @@ struct MergeTreePartInfo
static constexpr UInt32 MAX_BLOCK_NUMBER = 999999999;
};
/// Information about detached part, which includes its prefix in
/// addition to the above fields.
struct DetachedPartInfo : public MergeTreePartInfo
{
String prefix;
};
}

View File

@ -259,7 +259,7 @@ void ReplicatedMergeTreePartCheckThread::checkPart(const String & part_name)
storage.removePartAndEnqueueFetch(part_name);
/// Delete part locally.
storage.forgetPartAndMoveToDetached(part, "broken_");
storage.forgetPartAndMoveToDetached(part, "broken");
}
}
else if (part->modification_time + MAX_AGE_OF_LOCAL_PART_THAT_WASNT_ADDED_TO_ZOOKEEPER < time(nullptr))
@ -270,7 +270,7 @@ void ReplicatedMergeTreePartCheckThread::checkPart(const String & part_name)
ProfileEvents::increment(ProfileEvents::ReplicatedPartChecksFailed);
LOG_ERROR(log, "Unexpected part " << part_name << " in filesystem. Removing.");
storage.forgetPartAndMoveToDetached(part, "unexpected_");
storage.forgetPartAndMoveToDetached(part, "unexpected");
}
else
{

View File

@ -245,7 +245,7 @@ void ReplicatedMergeTreeRestartingThread::removeFailedQuorumParts()
if (part)
{
LOG_DEBUG(log, "Found part " << part_name << " with failed quorum. Moving to detached. This shouldn't happen often.");
storage.forgetPartAndMoveToDetached(part, "noquorum_");
storage.forgetPartAndMoveToDetached(part, "noquorum");
storage.queue.removeFromVirtualParts(part->info);
}
}

View File

@ -689,7 +689,7 @@ void StorageReplicatedMergeTree::checkParts(bool skip_sanity_checks)
for (const DataPartPtr & part : unexpected_parts)
{
LOG_ERROR(log, "Renaming unexpected part " << part->name << " to ignored_" + part->name);
forgetPartAndMoveToDetached(part, "ignored_", true);
forgetPartAndMoveToDetached(part, "ignored", true);
}
}

View File

@ -0,0 +1,84 @@
#include <Storages/System/StorageSystemDetachedParts.h>
#include <DataTypes/DataTypeString.h>
#include <DataTypes/DataTypesNumber.h>
#include <DataStreams/OneBlockInputStream.h>
#include <ext/shared_ptr_helper.h>
#include <Storages/IStorage.h>
#include <Storages/System/StorageSystemPartsBase.h>
namespace DB
{
/**
* Implements system table 'detached_parts' which allows to get information
* about detached data parts for tables of MergeTree family.
* We don't use StorageSystemPartsBase, because it introduces virtual _state
* column and column aliases which we don't need.
*/
class StorageSystemDetachedParts :
public ext::shared_ptr_helper<StorageSystemDetachedParts>,
public IStorage
{
public:
std::string getName() const override { return "SystemDetachedParts"; }
std::string getTableName() const override { return "detached_parts"; }
protected:
explicit StorageSystemDetachedParts()
{
setColumns(ColumnsDescription{{
{"database", std::make_shared<DataTypeString>()},
{"table", std::make_shared<DataTypeString>()},
{"partition_id", std::make_shared<DataTypeString>()},
{"name", std::make_shared<DataTypeString>()},
{"reason", std::make_shared<DataTypeString>()},
{"min_block_number", std::make_shared<DataTypeInt64>()},
{"max_block_number", std::make_shared<DataTypeInt64>()},
{"level", std::make_shared<DataTypeUInt32>()}
}});
}
BlockInputStreams read(
const Names & /* column_names */,
const SelectQueryInfo & query_info,
const Context & context,
QueryProcessingStage::Enum /*processed_stage*/,
const size_t /*max_block_size*/,
const unsigned /*num_streams*/) override
{
StoragesInfoStream stream(query_info, context);
/// Create the result.
Block block = getSampleBlock();
MutableColumns columns = block.cloneEmptyColumns();
while (StoragesInfo info = stream.next())
{
const auto parts = info.data->getDetachedParts();
for (auto & p : parts)
{
int i = 0;
columns[i++]->insert(info.database);
columns[i++]->insert(info.table);
columns[i++]->insert(p.partition_id);
columns[i++]->insert(p.getPartName());
columns[i++]->insert(p.prefix);
columns[i++]->insert(p.min_block);
columns[i++]->insert(p.max_block);
columns[i++]->insert(p.level);
}
}
return BlockInputStreams(1, std::make_shared<OneBlockInputStream>(
block.cloneWithColumns(std::move(columns))));
}
};
StoragePtr
createDetachedPartsTable()
{
return StorageSystemDetachedParts::create();
}
}

View File

@ -0,0 +1,10 @@
#pragma once
#include <Storages/IStorage_fwd.h>
namespace DB
{
StoragePtr createDetachedPartsTable();
}

View File

@ -51,11 +51,16 @@ StorageSystemParts::StorageSystemParts(const std::string & name)
void StorageSystemParts::processNextStorage(MutableColumns & columns, const StoragesInfo & info, bool has_state_column)
{
using State = MergeTreeDataPart::State;
MergeTreeData::DataPartStateVector all_parts_state;
MergeTreeData::DataPartsVector all_parts;
for (size_t part_number = 0; part_number < info.all_parts.size(); ++part_number)
all_parts = info.getParts(all_parts_state, has_state_column);
for (size_t part_number = 0; part_number < all_parts.size(); ++part_number)
{
const auto & part = info.all_parts[part_number];
auto part_state = info.all_parts_state[part_number];
const auto & part = all_parts[part_number];
auto part_state = all_parts_state[part_number];
MergeTreeDataPart::ColumnSize columns_size = part->getTotalColumnsSize();
size_t i = 0;

View File

@ -17,7 +17,7 @@
namespace DB
{
bool StorageSystemPartsBase::hasStateColumn(const Names & column_names)
bool StorageSystemPartsBase::hasStateColumn(const Names & column_names) const
{
bool has_state_column = false;
Names real_column_names;
@ -37,14 +37,25 @@ bool StorageSystemPartsBase::hasStateColumn(const Names & column_names)
return has_state_column;
}
class StoragesInfoStream
MergeTreeData::DataPartsVector
StoragesInfo::getParts(MergeTreeData::DataPartStateVector & state, bool has_state_column) const
{
public:
StoragesInfoStream(const SelectQueryInfo & query_info, const Context & context, bool has_state_column)
: query_id(context.getCurrentQueryId())
, has_state_column(has_state_column)
using State = MergeTreeData::DataPartState;
if (need_inactive_parts)
{
/// If has_state_column is requested, return all states.
if (!has_state_column)
return data->getDataPartsVector({State::Committed, State::Outdated}, &state);
return data->getAllDataPartsVector(&state);
}
return data->getDataPartsVector({State::Committed}, &state);
}
StoragesInfoStream::StoragesInfoStream(const SelectQueryInfo & query_info, const Context & context)
: query_id(context.getCurrentQueryId())
{
/// Will apply WHERE to subset of columns and then add more columns.
/// This is kind of complicated, but we use WHERE to do less work.
@ -133,11 +144,11 @@ public:
active_column = block_to_filter.getByName("active").column;
next_row = 0;
}
}
StorageSystemPartsBase::StoragesInfo next()
{
StorageSystemPartsBase::StoragesInfo info;
StoragesInfo StoragesInfoStream::next()
{
StoragesInfo info;
info.storage = nullptr;
while (next_row < rows)
@ -152,12 +163,14 @@ public:
(*table_column)[row].get<String>() == info.table;
};
/// What 'active' value we need.
bool need[2]{}; /// [active]
/// We may have two rows per table which differ in 'active' value.
/// If rows with 'active = 0' were not filtered out, this means we
/// must collect the inactive parts. Remember this fact in StoragesInfo.
for (; next_row < rows && isSameTable(next_row); ++next_row)
{
bool active = (*active_column)[next_row].get<UInt64>() != 0;
need[active] = true;
const auto active = (*active_column)[next_row].get<UInt64>();
if (active == 0)
info.need_inactive_parts = true;
}
info.storage = storages.at(std::make_pair(info.database, info.table));
@ -186,43 +199,11 @@ public:
if (!info.data)
throw Exception("Unknown engine " + info.engine, ErrorCodes::LOGICAL_ERROR);
using State = MergeTreeDataPart::State;
auto & all_parts_state = info.all_parts_state;
auto & all_parts = info.all_parts;
if (need[0])
{
/// If has_state_column is requested, return all states.
if (!has_state_column)
all_parts = info.data->getDataPartsVector({State::Committed, State::Outdated}, &all_parts_state);
else
all_parts = info.data->getAllDataPartsVector(&all_parts_state);
}
else
all_parts = info.data->getDataPartsVector({State::Committed}, &all_parts_state);
break;
}
return info;
}
private:
String query_id;
bool has_state_column;
ColumnPtr database_column;
ColumnPtr table_column;
ColumnPtr active_column;
size_t next_row;
size_t rows;
using StoragesMap = std::map<std::pair<String, String>, StoragePtr>;
StoragesMap storages;
};
}
BlockInputStreams StorageSystemPartsBase::read(
const Names & column_names,
@ -234,7 +215,7 @@ BlockInputStreams StorageSystemPartsBase::read(
{
bool has_state_column = hasStateColumn(column_names);
StoragesInfoStream stream(query_info, context, has_state_column);
StoragesInfoStream stream(query_info, context);
/// Create the result.

View File

@ -11,6 +11,42 @@ namespace DB
class Context;
struct StoragesInfo
{
StoragePtr storage;
TableStructureReadLockHolder table_lock;
String database;
String table;
String engine;
bool need_inactive_parts;
MergeTreeData * data;
operator bool() const { return storage != nullptr; }
MergeTreeData::DataPartsVector getParts(MergeTreeData::DataPartStateVector & state, bool has_state_column) const;
};
/** A helper class that enumerates the storages that match given query. */
class StoragesInfoStream
{
public:
StoragesInfoStream(const SelectQueryInfo & query_info, const Context & context);
StoragesInfo next();
private:
String query_id;
ColumnPtr database_column;
ColumnPtr table_column;
ColumnPtr active_column;
size_t next_row;
size_t rows;
using StoragesMap = std::map<std::pair<String, String>, StoragePtr>;
StoragesMap storages;
};
/** Implements system table 'parts' which allows to get information about data parts for tables of MergeTree family.
*/
@ -31,26 +67,10 @@ public:
size_t max_block_size,
unsigned num_streams) override;
struct StoragesInfo
{
StoragePtr storage;
TableStructureReadLockHolder table_lock;
String database;
String table;
String engine;
MergeTreeData * data;
MergeTreeData::DataPartStateVector all_parts_state;
MergeTreeData::DataPartsVector all_parts;
operator bool() const { return storage != nullptr; }
};
private:
const std::string name;
bool hasStateColumn(const Names & column_names);
bool hasStateColumn(const Names & column_names) const;
protected:
const FormatSettings format_settings;

View File

@ -80,10 +80,13 @@ void StorageSystemPartsColumns::processNextStorage(MutableColumns & columns, con
}
/// Go through the list of parts.
for (size_t part_number = 0; part_number < info.all_parts.size(); ++part_number)
MergeTreeData::DataPartStateVector all_parts_state;
MergeTreeData::DataPartsVector all_parts;
all_parts = info.getParts(all_parts_state, has_state_column);
for (size_t part_number = 0; part_number < all_parts.size(); ++part_number)
{
const auto & part = info.all_parts[part_number];
auto part_state = info.all_parts_state[part_number];
const auto & part = all_parts[part_number];
auto part_state = all_parts_state[part_number];
auto columns_size = part->getTotalColumnsSize();
/// For convenience, in returned refcount, don't add references that was due to local variables in this method: all_parts, active_parts.

View File

@ -9,6 +9,7 @@
#include <Storages/System/StorageSystemColumns.h>
#include <Storages/System/StorageSystemDatabases.h>
#include <Storages/System/StorageSystemDataTypeFamilies.h>
#include <Storages/System/StorageSystemDetachedParts.h>
#include <Storages/System/StorageSystemDictionaries.h>
#include <Storages/System/StorageSystemEvents.h>
#include <Storages/System/StorageSystemFormats.h>
@ -64,6 +65,7 @@ void attachSystemTablesServer(IDatabase & system_database, bool has_zookeeper)
{
attachSystemTablesLocal(system_database);
system_database.attachTable("parts", StorageSystemParts::create("parts"));
system_database.attachTable("detached_parts", createDetachedPartsTable());
system_database.attachTable("parts_columns", StorageSystemPartsColumns::create("parts_columns"));
system_database.attachTable("processes", StorageSystemProcesses::create("processes"));
system_database.attachTable("metrics", StorageSystemMetrics::create("metrics"));

View File

@ -8,6 +8,8 @@ Sum before DETACH PARTITION:
15
Sum after DETACH PARTITION:
0
system.detached_parts after DETACH PARTITION:
test not_partitioned all all_1_2_1 1 2 1
*** Partitioned by week ***
Parts before OPTIMIZE:
1999-12-27 19991227_1_1_0

View File

@ -17,6 +17,8 @@ SELECT sum(x) FROM test.not_partitioned;
ALTER TABLE test.not_partitioned DETACH PARTITION ID 'all';
SELECT 'Sum after DETACH PARTITION:';
SELECT sum(x) FROM test.not_partitioned;
SELECT 'system.detached_parts after DETACH PARTITION:';
SELECT * FROM system.detached_parts WHERE table = 'not_partitioned';
DROP TABLE test.not_partitioned;

View File

@ -57,6 +57,14 @@ This table contains a single String column called 'name' the name of a datab
Each database that the server knows about has a corresponding entry in the table.
This system table is used for implementing the `SHOW DATABASES` query.
## system.detached_parts
Contains information about detached parts of
[MergeTree](table_engines/mergetree.md) tables. The `reason` column specifies
why the part was detached. For user-detached parts, the reason is empty. Such
parts can be attached with [ALTER TABLE ATTACH PARTITION|PART](../query_language/query_language/alter/#alter_attach-partition)
command. For the description of other columns, see [system.parts](#system_tables-parts).
## system.dictionaries
Contains information about external dictionaries.