This commit is contained in:
Michael Kolupaev 2014-07-09 17:39:19 +04:00
parent 0a67d151bd
commit a6fd9cd2bd
9 changed files with 149 additions and 50 deletions

View File

@ -3,10 +3,12 @@
#include <map>
#include <list>
#include <string>
#include <set>
#include <Poco/SharedPtr.h>
#include <DB/DataTypes/IDataType.h>
#include <DB/DataTypes/DataTypeFactory.h>
namespace DB
@ -33,8 +35,55 @@ struct NameAndTypePair
}
};
typedef std::list<NameAndTypePair> NamesAndTypesList;
typedef SharedPtr<NamesAndTypesList> NamesAndTypesListPtr;
typedef std::vector<NameAndTypePair> NamesAndTypes;
class NamesAndTypesList : public std::list<NameAndTypePair>
{
public:
using std::list<NameAndTypePair>::list;
void readText(ReadBuffer & buf, const DataTypeFactory & data_type_factory)
{
DB::assertString("columns format version: 1\n", buf);
size_t count;
DB::readText(count, buf);
DB::assertString(" columns:\n", buf);
resize(count);
for (NameAndTypePair & it : *this)
{
DB::readBackQuotedString(it.name, buf);
DB::assertString(" ", buf);
String type_name;
DB::readString(type_name, buf);
it.type = data_type_factory.get(type_name);
DB::assertString("\n", buf);
}
}
void writeText(WriteBuffer & buf)
{
DB::writeString("columns format version: 1\n", buf);
DB::writeText(size(), buf);
DB::writeString(" columns:\n", buf);
for (const auto & it : *this)
{
DB::writeBackQuotedString(it.name, buf);
DB::writeChar(' ', buf);
DB::writeString(it.type->getName(), buf);
DB::writeChar('\n', buf);
}
}
/// Все элементы rhs должны быть различны.
bool isSubsetOf(const NamesAndTypesList & rhs) const
{
NamesAndTypes vector(rhs.begin(), rhs.end());
vector.insert(vector.end(), begin(), end());
std::sort(vector.begin(), vector.end());
return std::unique(vector.begin(), vector.end()) == vector.begin() + rhs.size();
}
};
typedef SharedPtr<NamesAndTypesList> NamesAndTypesListPtr;
}

View File

@ -40,6 +40,7 @@ namespace DB
* / min-date _ max-date _ min-id _ max-id _ level / - директория с куском.
* Внутри директории с куском:
* checksums.txt - список файлов с их размерами и контрольными суммами.
* columns.txt - список столбцов с их типами.
* primary.idx - индексный файл.
* Column.bin - данные столбца
* Column.mrk - засечки, указывающие, откуда начинать чтение, чтобы пропустить n * k строк.
@ -220,6 +221,10 @@ public:
Checksums checksums;
/// Описание столбцов.
NamesAndTypesList columns;
Poco::RWLock columns_lock;
/// NOTE можно загружать засечки тоже в оперативку
/// Вычисляем сумарный размер всей директории со всеми файлами
@ -261,7 +266,7 @@ public:
{
/// Размер - в количестве засечек.
if (!size)
size = Poco::File(storage.full_path + name + "/" + escapeForFileName(storage.columns->front().name) + ".mrk")
size = Poco::File(storage.full_path + name + "/" + escapeForFileName(columns.front().name) + ".mrk")
.getSize() / MERGE_TREE_MARK_SIZE;
size_t key_size = storage.sort_descr.size();
@ -282,15 +287,34 @@ public:
}
/// Прочитать контрольные суммы, если есть.
bool loadChecksums()
void loadChecksums()
{
String path = storage.full_path + name + "/checksums.txt";
if (!Poco::File(path).exists())
return false;
{
if (storage.require_part_metadata)
throw Exception("No checksums.txt in part " + name, ErrorCodes::NO_FILE_IN_DATA_PART);
return;
}
ReadBufferFromFile file(path, std::min(static_cast<size_t>(DBMS_DEFAULT_BUFFER_SIZE), Poco::File(path).getSize()));
if (checksums.readText(file))
assertEOF(file);
return true;
}
void loadColumns()
{
String path = storage.full_path + name + "/columns.txt";
if (!Poco::File(path).exists())
{
if (storage.require_part_metadata)
throw Exception("No columns.txt in part " + name, ErrorCodes::NO_FILE_IN_DATA_PART);
columns = *storage.columns;
return;
}
ReadBufferFromFile file(path, std::min(static_cast<size_t>(DBMS_DEFAULT_BUFFER_SIZE), Poco::File(path).getSize()));
columns.readText(file, storage.context.getDataTypeFactory());
}
void checkNotBroken()
@ -299,6 +323,17 @@ public:
if (!checksums.empty())
{
if (!checksums.files.count("primary.idx"))
throw Exception("No checksum for primary.idx", ErrorCodes::NO_FILE_IN_DATA_PART);
for (const NameAndTypePair & it : columns)
{
String name = escapeForFileName(it.name);
if (!checksums.files.count(name + ".idx") ||
!checksums.files.count(name + ".bin"))
throw Exception("No .idx or .bin file checksum for column " + name, ErrorCodes::NO_FILE_IN_DATA_PART);
}
checksums.checkSizes(path + "/");
}
else
@ -313,9 +348,9 @@ public:
/// Проверяем, что все засечки непусты и имеют одинаковый размер.
ssize_t marks_size = -1;
for (NamesAndTypesList::const_iterator it = storage.columns->begin(); it != storage.columns->end(); ++it)
for (const NameAndTypePair & it : columns)
{
Poco::File marks_file(path + "/" + escapeForFileName(it->name) + ".mrk");
Poco::File marks_file(path + "/" + escapeForFileName(it.name) + ".mrk");
/// При добавлении нового столбца в таблицу файлы .mrk не создаются. Не будем ничего удалять.
if (!marks_file.exists())
@ -401,6 +436,7 @@ public:
* primary_expr_ast - выражение для сортировки;
* date_column_name - имя столбца с датой;
* index_granularity - на сколько строчек пишется одно значение индекса.
* require_part_metadata - обязательно ли в директории с куском должны быть checksums.txt и columns.txt
*/
MergeTreeData( const String & full_path_, NamesAndTypesListPtr columns_,
const Context & context_,
@ -411,7 +447,9 @@ public:
Mode mode_,
const String & sign_column_,
const MergeTreeSettings & settings_,
const String & log_name_);
const String & log_name_,
bool require_part_metadata_
);
std::string getModePrefix() const;
@ -422,7 +460,7 @@ public:
UInt64 getMaxDataPartIndex();
std::string getTableName() const {
return "abc";//throw Exception("Logical error: calling method getTableName of not a table.", ErrorCodes::LOGICAL_ERROR);
throw Exception("Logical error: calling method getTableName of not a table.", ErrorCodes::LOGICAL_ERROR);
}
const NamesAndTypesList & getColumnsList() const { return *columns; }
@ -515,6 +553,8 @@ public:
const ASTPtr primary_expr_ast;
private:
bool require_part_metadata;
ExpressionActionsPtr primary_expr;
SortDescription sort_descr;
Block primary_key_sample;

View File

@ -211,7 +211,6 @@ public:
index_file_stream = new WriteBufferFromFile(part_path + "primary.idx", DBMS_DEFAULT_BUFFER_SIZE, O_TRUNC | O_CREAT | O_WRONLY);
index_stream = new HashingWriteBuffer(*index_file_stream);
columns_list = storage.getColumnsList();
for (const auto & it : columns_list)
addStream(part_path, it.name, *it.type);
}
@ -283,11 +282,18 @@ public:
/// Кусок пустой - все записи удалились.
Poco::File(part_path).remove(true);
checksums.files.clear();
return checksums;
}
else
{
/// Записываем файл с описанием столбцов.
WriteBufferFromFile out(part_path + "columns.txt", 4096);
columns_list.writeText(out);
}
{
/// Записываем файл с чексуммами.
WriteBufferFromFile out(part_path + "checksums.txt", 1024);
WriteBufferFromFile out(part_path + "checksums.txt", 4096);
checksums.writeText(out);
}

View File

@ -28,6 +28,7 @@ public:
/// Список файлов возьмем из списка контрольных сумм.
MergeTreeData::DataPart::Checksums checksums = part->checksums;
checksums.files["checksums.txt"];
checksums.files["columns.txt"];
writeBinary(checksums.files.size(), out);
for (const auto & it : checksums.files)
@ -107,7 +108,8 @@ public:
if (expected_hash != hashing_out.getHash())
throw Exception("Checksum mismatch for file " + part_path + file_name + " transferred from " + replica_path);
if (file_name != "checksums.txt")
if (file_name != "checksums.txt" &&
file_name != "columns.txt")
checksums.addFile(file_name, file_size, expected_hash);
}
@ -117,8 +119,9 @@ public:
ActiveDataPartSet::parsePartName(part_name, *new_data_part);
new_data_part->name = "tmp_" + part_name;
new_data_part->modification_time = time(0);
new_data_part->loadIndex();
new_data_part->loadColumns();
new_data_part->loadChecksums();
new_data_part->loadIndex();
new_data_part->checksums.checkEqual(checksums, false);

View File

@ -9,6 +9,7 @@
#include <DB/Parsers/ASTNameTypePair.h>
#include <DB/DataStreams/ExpressionBlockInputStream.h>
#include <DB/DataStreams/copyData.h>
#include <DB/IO/WriteBufferFromFile.h>
#include <algorithm>
@ -25,12 +26,14 @@ MergeTreeData::MergeTreeData(
Mode mode_,
const String & sign_column_,
const MergeTreeSettings & settings_,
const String & log_name_)
const String & log_name_,
bool require_part_metadata_)
: context(context_),
date_column_name(date_column_name_), sampling_expression(sampling_expression_),
index_granularity(index_granularity_),
mode(mode_), sign_column(sign_column_),
settings(settings_), primary_expr_ast(primary_expr_ast_->clone()),
require_part_metadata(require_part_metadata_),
full_path(full_path_), columns(columns_), log_name(log_name_),
log(&Logger::get(log_name + " (Data)"))
{
@ -128,8 +131,9 @@ void MergeTreeData::loadDataParts()
try
{
part->loadIndex();
part->loadColumns();
part->loadChecksums();
part->loadIndex();
part->checkNotBroken();
}
catch (...)
@ -318,6 +322,8 @@ void MergeTreeData::dropAllData()
void MergeTreeData::removeColumnFiles(String column_name, bool remove_array_size_files)
{
throw Exception("removeColumnFiles is closed for reconstruction. Please come back later.", ErrorCodes::NOT_IMPLEMENTED);
Poco::ScopedLock<Poco::FastMutex> lock(data_parts_mutex);
Poco::ScopedLock<Poco::FastMutex> lock_all(all_data_parts_mutex);
@ -335,44 +341,32 @@ void MergeTreeData::removeColumnFiles(String column_name, bool remove_array_size
Poco::RegularExpression re(column_name + "(?:(?:\\.|\\%2E).+){0,1}" +"(?:\\.mrk|\\.bin|\\.size\\d+\\.bin|\\.size\\d+\\.mrk)");
/// Цикл по всем директориям кусочков
Poco::RegularExpression::MatchVec matches;
Poco::DirectoryIterator end;
for (Poco::DirectoryIterator it_dir = Poco::DirectoryIterator(full_path); it_dir != end; ++it_dir)
for (auto & part : all_data_parts)
{
std::string dir_name = it_dir.name();
if (!ActiveDataPartSet::isPartDirectory(dir_name, matches))
continue;
/// Цикл по каждому из файлов в директории кусочков
String full_dir_name = full_path + dir_name + "/";
for (Poco::DirectoryIterator it_file(full_dir_name); it_file != end; ++it_file)
String full_dir_name = full_path + part->name + "/";
for (Poco::DirectoryIterator it_file(full_dir_name); it_file != Poco::DirectoryIterator(); ++it_file)
{
if (re.match(it_file.name()))
{
Poco::File file(full_dir_name + it_file.name());
if (file.exists())
file.remove();
file.remove();
}
}
/// Удаляем лишние столбцы из checksums.txt
for (auto & part : all_data_parts)
for (auto it = part->checksums.files.lower_bound(column_name);
(it != part->checksums.files.end()) && (it->first.substr(0, column_name.size()) == column_name);)
{
if (!part)
continue;
for (auto it = part->checksums.files.lower_bound(column_name);
(it != part->checksums.files.end()) && (it->first.substr(0, column_name.size()) == column_name);)
{
if (re.match(it->first))
it = const_cast<DataPart::Checksums::FileChecksums &>(part->checksums.files).erase(it);
else
++it;
}
/// Записываем файл с чексуммами.
WriteBufferFromFile out(full_path + part->name + "/" + "checksums.txt", 1024);
part->checksums.writeText(out);
if (re.match(it->first))
it = const_cast<DataPart::Checksums::FileChecksums &>(part->checksums.files).erase(it);
else
++it;
}
/// Записываем файл с чексуммами.
WriteBufferFromFile out(full_path + part->name + "/" + "checksums.txt", 1024);
part->checksums.writeText(out);
}
}
@ -411,6 +405,8 @@ static bool namesWithDotEqual(const String & name_with_dot, const NameAndTypePai
void MergeTreeData::alter(const ASTAlterQuery::Parameters & params)
{
throw Exception("ALTER is closed for reconstruction. Please come back later.", ErrorCodes::NOT_IMPLEMENTED);
{
Poco::ScopedLock<Poco::FastMutex> lock(data_parts_mutex);
Poco::ScopedLock<Poco::FastMutex> lock_all(all_data_parts_mutex);
@ -436,6 +432,8 @@ void MergeTreeData::alter(const ASTAlterQuery::Parameters & params)
void MergeTreeData::prepareAlterModify(const ASTAlterQuery::Parameters & params)
{
throw Exception("ALTER is closed for reconstruction. Please come back later.", ErrorCodes::NOT_IMPLEMENTED);
DataPartsVector parts;
{
Poco::ScopedLock<Poco::FastMutex> lock(data_parts_mutex);

View File

@ -304,7 +304,8 @@ MergeTreeData::DataPartPtr MergeTreeDataMerger::mergeParts(
String new_part_tmp_path = data.getFullPath() + "tmp_" + merged_name + "/";
MergedBlockOutputStreamPtr to = new MergedBlockOutputStream(data, new_part_tmp_path, data.getColumnsList());
NamesAndTypesList columns = data.getColumnsList();
MergedBlockOutputStreamPtr to = new MergedBlockOutputStream(data, new_part_tmp_path, columns);
merged_stream->readPrefix();
to->writePrefix();
@ -317,8 +318,8 @@ MergeTreeData::DataPartPtr MergeTreeDataMerger::mergeParts(
throw Exception("Canceled merging parts", ErrorCodes::ABORTED);
merged_stream->readSuffix();
new_data_part->columns = columns;
new_data_part->checksums = to->writeSuffixAndGetChecksums();
new_data_part->index.swap(to->getIndex());
/// Для удобства, даже CollapsingSortedBlockInputStream не может выдать ноль строк.

View File

@ -97,7 +97,8 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataWriter::writeTempPart(BlockWithDa
/// Сортируем.
stableSortBlock(block, sort_descr);
MergedBlockOutputStream out(data, part_tmp_path, block.getColumnsList());
ColumnsWithNameAndType columns = block.getColumnsList();
MergedBlockOutputStream out(data, part_tmp_path, columns);
out.getIndex().reserve(part_size * sort_descr.size());
out.writePrefix();
@ -115,8 +116,9 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataWriter::writeTempPart(BlockWithDa
new_data_part->modification_time = time(0);
new_data_part->left_month = date_lut.toFirstDayNumOfMonth(new_data_part->left_date);
new_data_part->right_month = date_lut.toFirstDayNumOfMonth(new_data_part->right_date);
new_data_part->index.swap(out.getIndex());
new_data_part->columns = columns;
new_data_part->checksums = checksums;
new_data_part->index.swap(out.getIndex());
new_data_part->size_in_bytes = MergeTreeData::DataPart::calcTotalSize(part_tmp_path);
return new_data_part;

View File

@ -19,7 +19,7 @@ StorageMergeTree::StorageMergeTree(const String & path_, const String & database
: path(path_), name(name_), full_path(path + escapeForFileName(name) + '/'), increment(full_path + "increment.txt"),
background_pool(context_.getBackgroundPool()),
data(full_path, columns_, context_, primary_expr_ast_, date_column_name_, sampling_expression_,
index_granularity_,mode_, sign_column_, settings_, database_name_ + "." + name),
index_granularity_,mode_, sign_column_, settings_, database_name_ + "." + name, false),
reader(data), writer(data), merger(data),
log(&Logger::get(database_name_ + "." + name + " (StorageMergeTree)")),
shutdown_called(false)

View File

@ -36,7 +36,7 @@ StorageReplicatedMergeTree::StorageReplicatedMergeTree(
table_name(name_), full_path(path_ + escapeForFileName(table_name) + '/'), zookeeper_path(zookeeper_path_),
replica_name(replica_name_),
data( full_path, columns_, context_, primary_expr_ast_, date_column_name_, sampling_expression_,
index_granularity_, mode_, sign_column_, settings_, database_name_ + "." + table_name),
index_granularity_, mode_, sign_column_, settings_, database_name_ + "." + table_name, true),
reader(data), writer(data), merger(data), fetcher(data),
log(&Logger::get(database_name_ + "." + table_name + " (StorageReplicatedMergeTree)")),
shutdown_event(false), permanent_shutdown_event(false)
@ -74,7 +74,7 @@ StorageReplicatedMergeTree::StorageReplicatedMergeTree(
LOG_INFO(log, "Have unreplicated data");
unreplicated_data.reset(new MergeTreeData(unreplicated_path, columns_, context_, primary_expr_ast_,
date_column_name_, sampling_expression_, index_granularity_, mode_, sign_column_, settings_,
database_name_ + "." + table_name + "[unreplicated]"));
database_name_ + "." + table_name + "[unreplicated]", false));
unreplicated_reader.reset(new MergeTreeDataSelectExecutor(*unreplicated_data));
unreplicated_merger.reset(new MergeTreeDataMerger(*unreplicated_data));
}