2015-01-27 00:52:03 +00:00
|
|
|
|
#include <DB/Storages/StorageSet.h>
|
|
|
|
|
#include <DB/IO/ReadBufferFromFile.h>
|
|
|
|
|
#include <DB/IO/CompressedReadBuffer.h>
|
|
|
|
|
#include <DB/DataStreams/NativeBlockInputStream.h>
|
|
|
|
|
#include <DB/Common/escapeForFileName.h>
|
2015-04-16 06:12:35 +00:00
|
|
|
|
#include <Poco/DirectoryIterator.h>
|
2015-01-27 00:52:03 +00:00
|
|
|
|
|
|
|
|
|
|
|
|
|
|
namespace DB
|
|
|
|
|
{
|
|
|
|
|
|
|
|
|
|
|
2015-01-28 00:08:45 +00:00
|
|
|
|
SetOrJoinBlockOutputStream::SetOrJoinBlockOutputStream(StorageSetOrJoinBase & table_,
|
|
|
|
|
const String & backup_path_, const String & backup_tmp_path_, const String & backup_file_name_)
|
|
|
|
|
: table(table_),
|
|
|
|
|
backup_path(backup_path_), backup_tmp_path(backup_tmp_path_),
|
|
|
|
|
backup_file_name(backup_file_name_),
|
|
|
|
|
backup_buf(backup_tmp_path + backup_file_name),
|
|
|
|
|
compressed_backup_buf(backup_buf),
|
|
|
|
|
backup_stream(compressed_backup_buf)
|
2015-01-27 00:52:03 +00:00
|
|
|
|
{
|
2015-01-28 00:08:45 +00:00
|
|
|
|
}
|
2015-01-27 00:52:03 +00:00
|
|
|
|
|
2015-01-28 00:08:45 +00:00
|
|
|
|
void SetOrJoinBlockOutputStream::write(const Block & block)
|
|
|
|
|
{
|
2015-01-30 18:57:44 +00:00
|
|
|
|
/// Сортируем столбцы в блоке. Это нужно, так как Set и Join рассчитывают на одинаковый порядок столбцов в разных блоках.
|
2015-02-13 20:37:30 +00:00
|
|
|
|
Block sorted_block = block.sortColumns();
|
2015-01-30 18:57:44 +00:00
|
|
|
|
|
|
|
|
|
table.insertBlock(sorted_block);
|
|
|
|
|
backup_stream.write(sorted_block);
|
2015-01-28 00:08:45 +00:00
|
|
|
|
}
|
2015-01-27 00:52:03 +00:00
|
|
|
|
|
2015-01-28 00:08:45 +00:00
|
|
|
|
void SetOrJoinBlockOutputStream::writeSuffix()
|
|
|
|
|
{
|
|
|
|
|
backup_stream.flush();
|
|
|
|
|
compressed_backup_buf.next();
|
|
|
|
|
backup_buf.next();
|
2015-01-27 00:52:03 +00:00
|
|
|
|
|
2015-01-28 00:08:45 +00:00
|
|
|
|
Poco::File(backup_tmp_path + backup_file_name).renameTo(backup_path + backup_file_name);
|
|
|
|
|
}
|
2015-01-27 00:52:03 +00:00
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
2015-01-28 00:08:45 +00:00
|
|
|
|
BlockOutputStreamPtr StorageSetOrJoinBase::write(ASTPtr query)
|
2015-01-27 00:52:03 +00:00
|
|
|
|
{
|
|
|
|
|
++increment;
|
2015-01-28 00:08:45 +00:00
|
|
|
|
return new SetOrJoinBlockOutputStream(*this, path, path + "tmp/", toString(increment) + ".bin");
|
2015-01-27 00:52:03 +00:00
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
2015-01-28 00:08:45 +00:00
|
|
|
|
StorageSetOrJoinBase::StorageSetOrJoinBase(
|
2015-01-27 00:52:03 +00:00
|
|
|
|
const String & path_,
|
|
|
|
|
const String & name_,
|
|
|
|
|
NamesAndTypesListPtr columns_,
|
|
|
|
|
const NamesAndTypesList & materialized_columns_,
|
|
|
|
|
const NamesAndTypesList & alias_columns_,
|
|
|
|
|
const ColumnDefaults & column_defaults_)
|
|
|
|
|
: IStorage{materialized_columns_, alias_columns_, column_defaults_},
|
|
|
|
|
path(path_ + escapeForFileName(name_) + '/'), name(name_), columns(columns_)
|
2015-01-28 00:08:45 +00:00
|
|
|
|
{
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
StorageSet::StorageSet(
|
|
|
|
|
const String & path_,
|
|
|
|
|
const String & name_,
|
|
|
|
|
NamesAndTypesListPtr columns_,
|
|
|
|
|
const NamesAndTypesList & materialized_columns_,
|
|
|
|
|
const NamesAndTypesList & alias_columns_,
|
|
|
|
|
const ColumnDefaults & column_defaults_)
|
|
|
|
|
: StorageSetOrJoinBase{path_, name_, columns_, materialized_columns_, alias_columns_, column_defaults_}
|
2015-01-27 00:52:03 +00:00
|
|
|
|
{
|
|
|
|
|
restore();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
2015-01-28 00:08:45 +00:00
|
|
|
|
void StorageSetOrJoinBase::restore()
|
2015-01-27 00:52:03 +00:00
|
|
|
|
{
|
|
|
|
|
Poco::File tmp_dir(path + "tmp/");
|
|
|
|
|
if (!tmp_dir.exists())
|
|
|
|
|
{
|
|
|
|
|
tmp_dir.createDirectories();
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
constexpr auto file_suffix = ".bin";
|
|
|
|
|
constexpr auto file_suffix_size = strlen(file_suffix);
|
|
|
|
|
|
|
|
|
|
Poco::DirectoryIterator dir_end;
|
|
|
|
|
for (Poco::DirectoryIterator dir_it(path); dir_end != dir_it; ++dir_it)
|
|
|
|
|
{
|
|
|
|
|
const auto & name = dir_it.name();
|
|
|
|
|
|
|
|
|
|
if (dir_it->isFile()
|
|
|
|
|
&& name.size() > file_suffix_size
|
|
|
|
|
&& 0 == name.compare(name.size() - file_suffix_size, file_suffix_size, file_suffix)
|
|
|
|
|
&& dir_it->getSize() > 0)
|
|
|
|
|
{
|
|
|
|
|
/// Вычисляем максимальный номер имеющихся файлов с бэкапом, чтобы добавлять следующие файлы с большими номерами.
|
|
|
|
|
UInt64 file_num = parse<UInt64>(name.substr(0, name.size() - file_suffix_size));
|
|
|
|
|
if (file_num > increment)
|
|
|
|
|
increment = file_num;
|
|
|
|
|
|
2015-05-28 03:49:28 +00:00
|
|
|
|
restoreFromFile(dir_it->path());
|
2015-01-27 00:52:03 +00:00
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
2015-05-28 03:49:28 +00:00
|
|
|
|
void StorageSetOrJoinBase::restoreFromFile(const String & file_path)
|
2015-01-27 00:52:03 +00:00
|
|
|
|
{
|
|
|
|
|
ReadBufferFromFile backup_buf(file_path);
|
|
|
|
|
CompressedReadBuffer compressed_backup_buf(backup_buf);
|
2015-05-28 03:49:28 +00:00
|
|
|
|
NativeBlockInputStream backup_stream(compressed_backup_buf);
|
2015-01-27 00:52:03 +00:00
|
|
|
|
|
|
|
|
|
backup_stream.readPrefix();
|
|
|
|
|
while (Block block = backup_stream.read())
|
2015-01-28 00:08:45 +00:00
|
|
|
|
insertBlock(block);
|
2015-01-27 00:52:03 +00:00
|
|
|
|
backup_stream.readSuffix();
|
|
|
|
|
|
|
|
|
|
/// TODO Добавить скорость, сжатые байты, объём данных в памяти, коэффициент сжатия... Обобщить всё логгирование статистики в проекте.
|
2015-01-28 00:08:45 +00:00
|
|
|
|
LOG_INFO(&Logger::get("StorageSetOrJoinBase"), std::fixed << std::setprecision(2)
|
2015-01-27 00:52:03 +00:00
|
|
|
|
<< "Loaded from backup file " << file_path << ". "
|
|
|
|
|
<< backup_stream.getInfo().rows << " rows, "
|
|
|
|
|
<< backup_stream.getInfo().bytes / 1048576.0 << " MiB. "
|
2015-01-28 00:08:45 +00:00
|
|
|
|
<< "State has " << getSize() << " unique rows.");
|
2015-01-27 00:52:03 +00:00
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
2015-01-28 00:08:45 +00:00
|
|
|
|
void StorageSetOrJoinBase::rename(const String & new_path_to_db, const String & new_database_name, const String & new_table_name)
|
2015-01-27 00:52:03 +00:00
|
|
|
|
{
|
|
|
|
|
/// Переименовываем директорию с данными.
|
|
|
|
|
String new_path = new_path_to_db + escapeForFileName(new_table_name);
|
|
|
|
|
Poco::File(path).renameTo(new_path);
|
|
|
|
|
|
|
|
|
|
path = new_path + "/";
|
|
|
|
|
name = new_table_name;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
}
|