Update system.parts table. [#CLICKHOUSE-3178]

This commit is contained in:
Vitaliy Lyudvichenko 2017-10-06 19:48:41 +03:00
parent 483dfd1b44
commit 602560cb2e
3 changed files with 39 additions and 22 deletions

View File

@ -1692,11 +1692,6 @@ MergeTreeData::DataPartPtr MergeTreeData::getPartIfExists(const String & part_na
return nullptr; return nullptr;
} }
MergeTreeData::DataPartPtr MergeTreeData::getPartIfExists(const String & part_name)
{
return getPartIfExists(part_name, {DataPartState::Committed});
}
MergeTreeData::DataPartPtr MergeTreeData::getShardedPartIfExists(const String & part_name, size_t shard_no) MergeTreeData::DataPartPtr MergeTreeData::getShardedPartIfExists(const String & part_name, size_t shard_no)
{ {
const MutableDataPartPtr & part_from_shard = per_shard_data_parts.at(shard_no); const MutableDataPartPtr & part_from_shard = per_shard_data_parts.at(shard_no);
@ -1983,6 +1978,21 @@ MergeTreeData::DataPartsVector MergeTreeData::getDataPartsVector(const DataPartS
return res; return res;
} }
MergeTreeData::DataPartsVector MergeTreeData::getDataPartsVector(const MergeTreeData::DataPartStates & affordable_states,
MergeTreeData::DataPartStateVector & out_states_snapshot) const
{
DataPartsVector res;
{
std::lock_guard<std::mutex> lock(data_parts_mutex);
std::copy_if(data_parts.begin(), data_parts.end(), std::back_inserter(res), DataPart::getStatesFilter(affordable_states));
out_states_snapshot.resize(res.size());
for (size_t i = 0; i < res.size(); ++i)
out_states_snapshot[i] = res[i]->state;
}
return res;
}
MergeTreeData::DataParts MergeTreeData::getDataParts(const DataPartStates & affordable_states) const MergeTreeData::DataParts MergeTreeData::getDataParts(const DataPartStates & affordable_states) const
{ {
DataParts res; DataParts res;

View File

@ -102,6 +102,7 @@ public:
using DataPartState = MergeTreeDataPart::State; using DataPartState = MergeTreeDataPart::State;
using DataPartStates = std::initializer_list<DataPartState>; using DataPartStates = std::initializer_list<DataPartState>;
using DataPartStateVector = std::vector<DataPartState>;
struct DataPartPtrLess struct DataPartPtrLess
{ {
@ -310,6 +311,7 @@ public:
/// Returns a copy of the list so that the caller shouldn't worry about locks. /// Returns a copy of the list so that the caller shouldn't worry about locks.
DataParts getDataParts(const DataPartStates & affordable_states) const; DataParts getDataParts(const DataPartStates & affordable_states) const;
DataPartsVector getDataPartsVector(const DataPartStates & affordable_states) const; DataPartsVector getDataPartsVector(const DataPartStates & affordable_states) const;
DataPartsVector getDataPartsVector(const DataPartStates & affordable_states, DataPartStateVector & out_states_snapshot) const;
/// Returns a virtual container iteration only through parts with specified states /// Returns a virtual container iteration only through parts with specified states
decltype(auto) getDataPartsRange(const DataPartStates & affordable_states) const decltype(auto) getDataPartsRange(const DataPartStates & affordable_states) const
@ -328,10 +330,7 @@ public:
DataPartPtr getActiveContainingPart(const String & part_name); DataPartPtr getActiveContainingPart(const String & part_name);
/// Returns the part with the given name (and state) or nullptr if no such part. /// Returns the part with the given name (and state) or nullptr if no such part.
DataPartPtr getPartIfExists(const String & part_name, const DataPartStates & valid_states); DataPartPtr getPartIfExists(const String & part_name, const DataPartStates & valid_states = {DataPartState::Committed});
/// Returns committed part with the given name or nullptr if no such part.
DataPartPtr getPartIfExists(const String & part_name);
/// Total size of active parts in bytes. /// Total size of active parts in bytes.
size_t getTotalActiveSizeInBytes() const; size_t getTotalActiveSizeInBytes() const;

View File

@ -174,33 +174,41 @@ BlockInputStreams StorageSystemParts::read(
*/ */
if (e.code() == ErrorCodes::TABLE_IS_DROPPED) if (e.code() == ErrorCodes::TABLE_IS_DROPPED)
continue; continue;
else
throw; throw;
} }
String engine = storage->getName(); String engine = storage->getName();
MergeTreeData * data = nullptr; MergeTreeData * data = nullptr;
if (StorageMergeTree * merge_tree = dynamic_cast<StorageMergeTree *>(&*storage)) if (auto merge_tree = dynamic_cast<StorageMergeTree *>(&*storage))
{ {
data = &merge_tree->getData(); data = &merge_tree->getData();
} }
else if (StorageReplicatedMergeTree * replicated_merge_tree = dynamic_cast<StorageReplicatedMergeTree *>(&*storage)) else if (auto replicated_merge_tree = dynamic_cast<StorageReplicatedMergeTree *>(&*storage))
{ {
data = &replicated_merge_tree->getData(); data = &replicated_merge_tree->getData();
} }
MergeTreeData::DataParts active_parts = data->getDataParts();
MergeTreeData::DataParts all_parts;
if (need[0])
all_parts = data->getAllDataParts();
else else
all_parts = active_parts; {
throw Exception("Unknown engine " + engine, ErrorCodes::LOGICAL_ERROR);
}
using State = MergeTreeDataPart::State;
MergeTreeData::DataPartStateVector all_parts_state;
MergeTreeData::DataPartsVector all_parts;
if (need[0])
all_parts = data->getDataPartsVector({State::Committed, State::Outdated}, all_parts_state);
else
all_parts = data->getDataPartsVector({State::Committed}, all_parts_state);
/// Finally, we'll go through the list of parts. /// Finally, we'll go through the list of parts.
for (const MergeTreeData::DataPartPtr & part : all_parts) for (size_t part_number = 0; part_number < all_parts.size(); ++part_number)
{ {
const auto & part = all_parts[part_number];
auto part_state = all_parts_state[part_number];
size_t i = 0; size_t i = 0;
{ {
WriteBufferFromOwnString out; WriteBufferFromOwnString out;
@ -208,7 +216,7 @@ BlockInputStreams StorageSystemParts::read(
block.getByPosition(i++).column->insert(out.str()); block.getByPosition(i++).column->insert(out.str());
} }
block.getByPosition(i++).column->insert(part->name); block.getByPosition(i++).column->insert(part->name);
block.getByPosition(i++).column->insert(static_cast<UInt64>(active_parts.count(part))); block.getByPosition(i++).column->insert(static_cast<UInt64>(part_state == State::Committed));
block.getByPosition(i++).column->insert(static_cast<UInt64>(part->marks_count)); block.getByPosition(i++).column->insert(static_cast<UInt64>(part->marks_count));
size_t marks_size = 0; size_t marks_size = 0;
@ -227,7 +235,7 @@ BlockInputStreams StorageSystemParts::read(
block.getByPosition(i++).column->insert(static_cast<UInt64>(part->remove_time)); block.getByPosition(i++).column->insert(static_cast<UInt64>(part->remove_time));
/// For convenience, in returned refcount, don't add references that was due to local variables in this method: all_parts, active_parts. /// For convenience, in returned refcount, don't add references that was due to local variables in this method: all_parts, active_parts.
block.getByPosition(i++).column->insert(static_cast<UInt64>(part.use_count() - (active_parts.count(part) ? 2 : 1))); block.getByPosition(i++).column->insert(static_cast<UInt64>(part.use_count() - 1));
block.getByPosition(i++).column->insert(static_cast<UInt64>(part->getMinDate())); block.getByPosition(i++).column->insert(static_cast<UInt64>(part->getMinDate()));
block.getByPosition(i++).column->insert(static_cast<UInt64>(part->getMaxDate())); block.getByPosition(i++).column->insert(static_cast<UInt64>(part->getMaxDate()));