This commit is contained in:
Michael Kolupaev 2014-07-15 13:56:17 +04:00
parent 8eb2a7218b
commit 01094ec78d
9 changed files with 42 additions and 25 deletions

View File

@ -2,11 +2,15 @@
#include <vector>
#include <string>
#include <unordered_set>
#include <unordered_map>
namespace DB
{
typedef std::vector<std::string> Names;
typedef std::unordered_set<std::string> NameSet;
typedef std::unordered_map<std::string, std::string> NameToNameMap;
}

View File

@ -126,17 +126,22 @@ public:
}
/// Оставить только столбцы, имена которых есть в names. В names могут быть лишние столбцы.
NamesAndTypesList intersect(const Names & names) const
NamesAndTypesList filter(const NameSet & names) const
{
std::set<String> name_set(names.begin(), names.end());
NamesAndTypesList res;
for (const NameAndTypePair & column : *this)
{
if (name_set.count(column.name))
if (names.count(column.name))
res.push_back(column);
}
return res;
}
/// Оставить только столбцы, имена которых есть в names. В names могут быть лишние столбцы.
NamesAndTypesList filter(const Names & names) const
{
return filter(NameSet(names.begin(), names.end()));
}
};
typedef SharedPtr<NamesAndTypesList> NamesAndTypesListPtr;

View File

@ -18,8 +18,6 @@ typedef Poco::SharedPtr<IFunction> FunctionPtr;
typedef std::pair<std::string, std::string> NameWithAlias;
typedef std::vector<NameWithAlias> NamesWithAliases;
typedef std::unordered_set<String> NameSet;
typedef std::unordered_map<String, String> NameToNameMap;
class Join;

View File

@ -63,13 +63,13 @@ public:
storage.zookeeper->getDefaultACL(),
zkutil::CreateMode::Persistent));
ops.push_back(new zkutil::Op::Create(
storage.zookeeper_path + "/blocks/" + block_id + "/checksums",
part->checksums.toString(),
storage.zookeeper_path + "/blocks/" + block_id + "/columns",
part->columns.toString(),
storage.zookeeper->getDefaultACL(),
zkutil::CreateMode::Persistent));
ops.push_back(new zkutil::Op::Create(
storage.zookeeper_path + "/blocks/" + block_id + "/columns",
part->columns.toString(),
storage.zookeeper_path + "/blocks/" + block_id + "/checksums",
part->checksums.toString(),
storage.zookeeper->getDefaultACL(),
zkutil::CreateMode::Persistent));
ops.push_back(new zkutil::Op::Create(
@ -78,7 +78,7 @@ public:
storage.zookeeper->getDefaultACL(),
zkutil::CreateMode::Persistent));
}
storage.checkPartAndAddToZooKeeper(part, -1, ops);
storage.checkPartAndAddToZooKeeper(part, ops);
ops.push_back(new zkutil::Op::Create(
storage.replica_path + "/log/log-",
log_entry.toString(),

View File

@ -25,8 +25,11 @@ public:
MergeTreeData::DataPartPtr part = findPart(part_name);
Poco::ScopedReadRWLock part_lock(part->columns_lock);
/// Список файлов возьмем из списка контрольных сумм.
MergeTreeData::DataPart::Checksums checksums = part->checksums;
/// Добавим файлы, которых нет в списке контрольных сумм.
checksums.files["checksums.txt"];
checksums.files["columns.txt"];

View File

@ -306,7 +306,7 @@ private:
* Кладет в ops действия, добавляющие данные о куске в ZooKeeper.
* Вызывать под TableStructureLock.
*/
void checkPartAndAddToZooKeeper(MergeTreeData::DataPartPtr part, int expected_columns_version, zkutil::Ops & ops);
void checkPartAndAddToZooKeeper(MergeTreeData::DataPartPtr part, zkutil::Ops & ops);
void clearOldParts();

View File

@ -476,7 +476,7 @@ MergeTreeData::AlterDataPartTransactionPtr MergeTreeData::alterDataPart(DataPart
/// Запишем обновленный список столбцов во временный файл.
{
transaction->new_columns = new_columns.intersect(part->columns.getNames());
transaction->new_columns = new_columns.filter(part->columns.getNames());
WriteBufferFromFile columns_file(full_path + part->name + "/columns.txt.tmp", 4096);
transaction->new_columns.writeText(columns_file);
transaction->rename_map["columns.txt.tmp"] = "columns.txt";

View File

@ -253,10 +253,17 @@ MergeTreeData::DataPartPtr MergeTreeDataMerger::mergeParts(
{
LOG_DEBUG(log, "Merging " << parts.size() << " parts: from " << parts.front()->name << " to " << parts.back()->name << " into " << merged_name);
Names all_column_names;
NameSet union_columns_set;
for (const MergeTreeData::DataPartPtr & part : parts)
{
Poco::ScopedReadRWLock part_lock(part->columns_lock);
Names part_columns = part->columns.getNames();
union_columns_set.insert(part_columns.begin(), part_columns.end());
}
NamesAndTypesList columns_list = data.getColumnsList();
for (const auto & it : columns_list)
all_column_names.push_back(it.name);
NamesAndTypesList union_columns = columns_list.filter(union_columns_set);
Names union_column_names = union_columns.getNames();
MergeTreeData::MutableDataPartPtr new_data_part = std::make_shared<MergeTreeData::DataPart>(data);
ActiveDataPartSet::parsePartName(merged_name, *new_data_part);
@ -271,7 +278,7 @@ MergeTreeData::DataPartPtr MergeTreeDataMerger::mergeParts(
{
MarkRanges ranges(1, MarkRange(0, parts[i]->size));
src_streams.push_back(new ExpressionBlockInputStream(new MergeTreeBlockInputStream(
data.getFullPath() + parts[i]->name + '/', DEFAULT_MERGE_BLOCK_SIZE, all_column_names, data,
data.getFullPath() + parts[i]->name + '/', DEFAULT_MERGE_BLOCK_SIZE, union_column_names, data,
parts[i], ranges, false, nullptr, ""), data.getPrimaryExpression()));
}
@ -304,8 +311,7 @@ MergeTreeData::DataPartPtr MergeTreeDataMerger::mergeParts(
String new_part_tmp_path = data.getFullPath() + "tmp_" + merged_name + "/";
NamesAndTypesList columns = data.getColumnsList();
MergedBlockOutputStreamPtr to = new MergedBlockOutputStream(data, new_part_tmp_path, columns);
MergedBlockOutputStreamPtr to = new MergedBlockOutputStream(data, new_part_tmp_path, union_columns);
merged_stream->readPrefix();
to->writePrefix();
@ -318,7 +324,7 @@ MergeTreeData::DataPartPtr MergeTreeDataMerger::mergeParts(
throw Exception("Canceled merging parts", ErrorCodes::ABORTED);
merged_stream->readSuffix();
new_data_part->columns = columns;
new_data_part->columns = union_columns;
new_data_part->checksums = to->writeSuffixAndGetChecksums();
new_data_part->index.swap(to->getIndex());

View File

@ -433,7 +433,7 @@ void StorageReplicatedMergeTree::checkParts(bool skip_sanity_checks)
LOG_ERROR(log, "Adding unexpected local part to ZooKeeper: " << part->name);
zkutil::Ops ops;
checkPartAndAddToZooKeeper(part, -1, ops);
checkPartAndAddToZooKeeper(part, ops);
zookeeper->multi(ops);
}
@ -486,11 +486,10 @@ void StorageReplicatedMergeTree::initVirtualParts()
}
}
void StorageReplicatedMergeTree::checkPartAndAddToZooKeeper(
MergeTreeData::DataPartPtr part, int expected_columns_version, zkutil::Ops & ops)
void StorageReplicatedMergeTree::checkPartAndAddToZooKeeper(MergeTreeData::DataPartPtr part, zkutil::Ops & ops)
{
check(part->columns);
expected_columns_version = columns_version;
int expected_columns_version = columns_version;
Strings replicas = zookeeper->getChildren(zookeeper_path + "/replicas");
std::random_shuffle(replicas.begin(), replicas.end());
@ -551,6 +550,7 @@ void StorageReplicatedMergeTree::clearOldParts()
for (const String & name : parts)
{
zkutil::Ops ops;
ops.push_back(new zkutil::Op::Remove(replica_path + "/parts/" + name + "/columns", -1));
ops.push_back(new zkutil::Op::Remove(replica_path + "/parts/" + name + "/checksums", -1));
ops.push_back(new zkutil::Op::Remove(replica_path + "/parts/" + name, -1));
int32_t code = zookeeper->tryMulti(ops);
@ -622,6 +622,7 @@ void StorageReplicatedMergeTree::clearOldBlocks()
{
zkutil::Ops ops;
ops.push_back(new zkutil::Op::Remove(zookeeper_path + "/blocks/" + timed_blocks[i].second + "/number", -1));
ops.push_back(new zkutil::Op::Remove(zookeeper_path + "/blocks/" + timed_blocks[i].second + "/columns", -1));
ops.push_back(new zkutil::Op::Remove(zookeeper_path + "/blocks/" + timed_blocks[i].second + "/checksums", -1));
ops.push_back(new zkutil::Op::Remove(zookeeper_path + "/blocks/" + timed_blocks[i].second, -1));
zookeeper->multi(ops);
@ -849,7 +850,7 @@ void StorageReplicatedMergeTree::executeLogEntry(const LogEntry & entry, Backgro
MergeTreeData::DataPartPtr part = merger.mergeParts(parts, entry.new_part_name, &transaction);
zkutil::Ops ops;
checkPartAndAddToZooKeeper(part, -1, ops);
checkPartAndAddToZooKeeper(part, ops);
zookeeper->multi(ops);
transaction.commit();
@ -1235,7 +1236,7 @@ void StorageReplicatedMergeTree::fetchPart(const String & part_name, const Strin
auto removed_parts = data.renameTempPartAndReplace(part, nullptr, &transaction);
zkutil::Ops ops;
checkPartAndAddToZooKeeper(part, -1, ops);
checkPartAndAddToZooKeeper(part, ops);
zookeeper->multi(ops);
transaction.commit();