mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-21 15:12:02 +00:00
dbms: added StorageSet [#METR-2944].
This commit is contained in:
parent
a1eb599105
commit
fed642cd7c
@ -13,8 +13,8 @@ namespace DB
|
|||||||
class MaterializingBlockOutputStream : public IBlockOutputStream
|
class MaterializingBlockOutputStream : public IBlockOutputStream
|
||||||
{
|
{
|
||||||
public:
|
public:
|
||||||
MaterializingBlockOutputStream(const BlockOutputStreamPtr & output) : output{output}
|
MaterializingBlockOutputStream(const BlockOutputStreamPtr & output)
|
||||||
{}
|
: output{output} {}
|
||||||
|
|
||||||
void write(const Block & original_block) override
|
void write(const Block & original_block) override
|
||||||
{
|
{
|
||||||
@ -31,6 +31,12 @@ public:
|
|||||||
output->write(block);
|
output->write(block);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void flush() override { output->flush(); }
|
||||||
|
|
||||||
|
void writePrefix() override { output->writePrefix(); }
|
||||||
|
void writeSuffix() override { output->writeSuffix(); }
|
||||||
|
|
||||||
|
private:
|
||||||
BlockOutputStreamPtr output;
|
BlockOutputStreamPtr output;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
@ -49,6 +49,12 @@ public:
|
|||||||
output->write(block);
|
output->write(block);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void flush() override
|
||||||
|
{
|
||||||
|
if (output)
|
||||||
|
output->flush();
|
||||||
|
}
|
||||||
|
|
||||||
void writePrefix() override
|
void writePrefix() override
|
||||||
{
|
{
|
||||||
if (output)
|
if (output)
|
||||||
|
@ -78,7 +78,10 @@ public:
|
|||||||
*/
|
*/
|
||||||
void joinBlock(Block & block);
|
void joinBlock(Block & block);
|
||||||
|
|
||||||
size_t size() const { return getTotalRowCount(); }
|
/// Считает суммарное число ключей во всех Join'ах
|
||||||
|
size_t getTotalRowCount() const;
|
||||||
|
/// Считает суммарный размер в байтах буфферов всех Join'ов + размер string_pool'а
|
||||||
|
size_t getTotalByteCount() const;
|
||||||
|
|
||||||
|
|
||||||
/// Ссылка на строку в блоке.
|
/// Ссылка на строку в блоке.
|
||||||
@ -176,11 +179,6 @@ private:
|
|||||||
|
|
||||||
/// Проверить не превышены ли допустимые размеры множества
|
/// Проверить не превышены ли допустимые размеры множества
|
||||||
bool checkSizeLimits() const;
|
bool checkSizeLimits() const;
|
||||||
|
|
||||||
/// Считает суммарное число ключей во всех Join'ах
|
|
||||||
size_t getTotalRowCount() const;
|
|
||||||
/// Считает суммарный размер в байтах буфферов всех Join'ов + размер string_pool'а
|
|
||||||
size_t getTotalByteCount() const;
|
|
||||||
};
|
};
|
||||||
|
|
||||||
typedef Poco::SharedPtr<Join> JoinPtr;
|
typedef Poco::SharedPtr<Join> JoinPtr;
|
||||||
|
@ -47,9 +47,12 @@ public:
|
|||||||
void createFromAST(DataTypes & types, ASTPtr node, bool create_ordered_set);
|
void createFromAST(DataTypes & types, ASTPtr node, bool create_ordered_set);
|
||||||
|
|
||||||
// Возвращает false, если превышено какое-нибудь ограничение, и больше не нужно вставлять.
|
// Возвращает false, если превышено какое-нибудь ограничение, и больше не нужно вставлять.
|
||||||
bool insertFromBlock(Block & block, bool create_ordered_set = false);
|
bool insertFromBlock(const Block & block, bool create_ordered_set = false);
|
||||||
|
|
||||||
size_t size() const { return getTotalRowCount(); }
|
/// Считает суммарное число ключей во всех Set'ах
|
||||||
|
size_t getTotalRowCount() const;
|
||||||
|
/// Считает суммарный размер в байтах буфферов всех Set'ов + размер string_pool'а
|
||||||
|
size_t getTotalByteCount() const;
|
||||||
|
|
||||||
/** Для указанных столбцов блока проверить принадлежность их значений множеству.
|
/** Для указанных столбцов блока проверить принадлежность их значений множеству.
|
||||||
* Записать результат в столбец в позиции result.
|
* Записать результат в столбец в позиции result.
|
||||||
@ -152,16 +155,18 @@ private:
|
|||||||
/// Проверить не превышены ли допустимые размеры множества ключей
|
/// Проверить не превышены ли допустимые размеры множества ключей
|
||||||
bool checkSetSizeLimits() const;
|
bool checkSetSizeLimits() const;
|
||||||
|
|
||||||
/// Считает суммарное число ключей во всех Set'ах
|
|
||||||
size_t getTotalRowCount() const;
|
|
||||||
/// Считает суммарный размер в байтах буфферов всех Set'ов + размер string_pool'а
|
|
||||||
size_t getTotalByteCount() const;
|
|
||||||
|
|
||||||
/// вектор упорядоченных элементов Set
|
/// вектор упорядоченных элементов Set
|
||||||
/// нужен для работы индекса по первичному ключу в секции In
|
/// нужен для работы индекса по первичному ключу в секции In
|
||||||
typedef std::vector<Field> OrderedSetElements;
|
typedef std::vector<Field> OrderedSetElements;
|
||||||
typedef std::unique_ptr<OrderedSetElements> OrderedSetElementsPtr;
|
typedef std::unique_ptr<OrderedSetElements> OrderedSetElementsPtr;
|
||||||
OrderedSetElementsPtr ordered_set_elements;
|
OrderedSetElementsPtr ordered_set_elements;
|
||||||
|
|
||||||
|
/** Защищает работу с множеством в функциях insertFromBlock и execute.
|
||||||
|
* Эти функции могут вызываться одновременно из разных потоков только при использовании StorageSet,
|
||||||
|
* и StorageSet вызывает только эти две функции.
|
||||||
|
* Поэтому остальные функции по работе с множеством, не защинены.
|
||||||
|
*/
|
||||||
|
mutable Poco::RWLock rwlock;
|
||||||
};
|
};
|
||||||
|
|
||||||
typedef Poco::SharedPtr<Set> SetPtr;
|
typedef Poco::SharedPtr<Set> SetPtr;
|
||||||
|
64
dbms/include/DB/Storages/StorageSet.h
Normal file
64
dbms/include/DB/Storages/StorageSet.h
Normal file
@ -0,0 +1,64 @@
|
|||||||
|
#pragma once
|
||||||
|
|
||||||
|
#include <DB/Storages/IStorage.h>
|
||||||
|
#include <DB/Interpreters/Set.h>
|
||||||
|
|
||||||
|
|
||||||
|
namespace DB
|
||||||
|
{
|
||||||
|
|
||||||
|
/** Позволяет сохранить множество для последующего использования в правой части оператора IN.
|
||||||
|
* При вставке в таблицу, данные будут вставлены в множество,
|
||||||
|
* а также записаны в файл-бэкап, для восстановления после перезапуска.
|
||||||
|
* Чтение из таблицы напрямую невозможно - возможно лишь указание в правой части оператора IN.
|
||||||
|
*/
|
||||||
|
class StorageSet : public IStorage
|
||||||
|
{
|
||||||
|
public:
|
||||||
|
static StoragePtr create(
|
||||||
|
const String & path_,
|
||||||
|
const String & name_,
|
||||||
|
NamesAndTypesListPtr columns_,
|
||||||
|
const NamesAndTypesList & materialized_columns_,
|
||||||
|
const NamesAndTypesList & alias_columns_,
|
||||||
|
const ColumnDefaults & column_defaults_)
|
||||||
|
{
|
||||||
|
return (new StorageSet{
|
||||||
|
path_, name_, columns_,
|
||||||
|
materialized_columns_, alias_columns_, column_defaults_})->thisPtr();
|
||||||
|
}
|
||||||
|
|
||||||
|
String getName() const override { return "Set"; }
|
||||||
|
String getTableName() const override { return name; }
|
||||||
|
|
||||||
|
const NamesAndTypesList & getColumnsListImpl() const override { return *columns; }
|
||||||
|
|
||||||
|
BlockOutputStreamPtr write(ASTPtr query) override;
|
||||||
|
|
||||||
|
void rename(const String & new_path_to_db, const String & new_database_name, const String & new_table_name) override;
|
||||||
|
|
||||||
|
/// Получить доступ к внутренностям.
|
||||||
|
SetPtr & getSet() { return set; }
|
||||||
|
|
||||||
|
private:
|
||||||
|
String path;
|
||||||
|
String name;
|
||||||
|
NamesAndTypesListPtr columns;
|
||||||
|
|
||||||
|
UInt64 increment = 0; /// Для имён файлов бэкапа.
|
||||||
|
SetPtr set { new Set{Limits{}} };
|
||||||
|
|
||||||
|
StorageSet(
|
||||||
|
const String & path_,
|
||||||
|
const String & name_,
|
||||||
|
NamesAndTypesListPtr columns_,
|
||||||
|
const NamesAndTypesList & materialized_columns_,
|
||||||
|
const NamesAndTypesList & alias_columns_,
|
||||||
|
const ColumnDefaults & column_defaults_);
|
||||||
|
|
||||||
|
/// Восстановление из бэкапа.
|
||||||
|
void restore();
|
||||||
|
void restoreFromFile(const String & file_path, const DataTypeFactory & data_type_factory);
|
||||||
|
};
|
||||||
|
|
||||||
|
}
|
@ -126,9 +126,9 @@ void CreatingSetsBlockInputStream::create(SubqueryForSet & subquery)
|
|||||||
msg << "Created. ";
|
msg << "Created. ";
|
||||||
|
|
||||||
if (subquery.set)
|
if (subquery.set)
|
||||||
msg << "Set with " << subquery.set->size() << " entries from " << head_rows << " rows. ";
|
msg << "Set with " << subquery.set->getTotalRowCount() << " entries from " << head_rows << " rows. ";
|
||||||
if (subquery.join)
|
if (subquery.join)
|
||||||
msg << "Join with " << subquery.join->size() << " entries from " << head_rows << " rows. ";
|
msg << "Join with " << subquery.join->getTotalRowCount() << " entries from " << head_rows << " rows. ";
|
||||||
if (subquery.table)
|
if (subquery.table)
|
||||||
msg << "Table with " << head_rows << " rows. ";
|
msg << "Table with " << head_rows << " rows. ";
|
||||||
|
|
||||||
|
@ -26,6 +26,7 @@
|
|||||||
#include <DB/Storages/StorageDistributed.h>
|
#include <DB/Storages/StorageDistributed.h>
|
||||||
#include <DB/Storages/StorageMemory.h>
|
#include <DB/Storages/StorageMemory.h>
|
||||||
#include <DB/Storages/StorageReplicatedMergeTree.h>
|
#include <DB/Storages/StorageReplicatedMergeTree.h>
|
||||||
|
#include <DB/Storages/StorageSet.h>
|
||||||
|
|
||||||
#include <DB/DataStreams/LazyBlockInputStream.h>
|
#include <DB/DataStreams/LazyBlockInputStream.h>
|
||||||
#include <DB/DataStreams/copyData.h>
|
#include <DB/DataStreams/copyData.h>
|
||||||
@ -675,16 +676,37 @@ void ExpressionAnalyzer::makeSet(ASTFunction * node, const Block & sample_block)
|
|||||||
return;
|
return;
|
||||||
|
|
||||||
/// Если подзапрос или имя таблицы для SELECT.
|
/// Если подзапрос или имя таблицы для SELECT.
|
||||||
if (typeid_cast<ASTSubquery *>(&*arg) || typeid_cast<ASTIdentifier *>(&*arg))
|
ASTIdentifier * identifier = typeid_cast<ASTIdentifier *>(&*arg);
|
||||||
|
if (typeid_cast<ASTSubquery *>(&*arg) || identifier)
|
||||||
{
|
{
|
||||||
/// Получаем поток блоков для подзапроса. Создаём Set и кладём на место подзапроса.
|
/// Получаем поток блоков для подзапроса. Создаём Set и кладём на место подзапроса.
|
||||||
String set_id = arg->getColumnName();
|
String set_id = arg->getColumnName();
|
||||||
ASTSet * ast_set = new ASTSet(set_id);
|
ASTSet * ast_set = new ASTSet(set_id);
|
||||||
ASTPtr ast_set_ptr = ast_set;
|
ASTPtr ast_set_ptr = ast_set;
|
||||||
|
|
||||||
|
/// Особый случай - если справа оператора IN указано имя таблицы, при чём, таблица имеет тип Set (заранее подготовленное множество).
|
||||||
|
/// TODO В этом синтаксисе не поддерживается указание имени БД.
|
||||||
|
if (identifier)
|
||||||
|
{
|
||||||
|
StoragePtr table = context.tryGetTable("", identifier->name);
|
||||||
|
|
||||||
|
if (table)
|
||||||
|
{
|
||||||
|
StorageSet * storage_set = typeid_cast<StorageSet *>(table.get());
|
||||||
|
|
||||||
|
if (storage_set)
|
||||||
|
{
|
||||||
|
SetPtr & set = storage_set->getSet();
|
||||||
|
ast_set->set = set;
|
||||||
|
arg = ast_set_ptr;
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
SubqueryForSet & subquery_for_set = subqueries_for_sets[set_id];
|
SubqueryForSet & subquery_for_set = subqueries_for_sets[set_id];
|
||||||
|
|
||||||
/// Если уже создали Set с таким же подзапросом.
|
/// Если уже создали Set с таким же подзапросом/таблицей.
|
||||||
if (subquery_for_set.set)
|
if (subquery_for_set.set)
|
||||||
{
|
{
|
||||||
ast_set->set = subquery_for_set.set;
|
ast_set->set = subquery_for_set.set;
|
||||||
|
@ -74,10 +74,6 @@ void InterpreterInsertQuery::execute(ReadBuffer * remaining_data_istr)
|
|||||||
* then we compose the same list from the resulting block */
|
* then we compose the same list from the resulting block */
|
||||||
NamesAndTypesListPtr required_columns = new NamesAndTypesList(table->getColumnsList());
|
NamesAndTypesListPtr required_columns = new NamesAndTypesList(table->getColumnsList());
|
||||||
|
|
||||||
/// Надо убедиться, что запрос идет в таблицу, которая поддерживает вставку.
|
|
||||||
/// TODO Плохо - исправить.
|
|
||||||
table->write(query_ptr);
|
|
||||||
|
|
||||||
/// Создаем кортеж из нескольких стримов, в которые будем писать данные.
|
/// Создаем кортеж из нескольких стримов, в которые будем писать данные.
|
||||||
BlockOutputStreamPtr out{
|
BlockOutputStreamPtr out{
|
||||||
new ProhibitColumnsBlockOutputStream{
|
new ProhibitColumnsBlockOutputStream{
|
||||||
@ -141,10 +137,6 @@ BlockIO InterpreterInsertQuery::execute()
|
|||||||
|
|
||||||
NamesAndTypesListPtr required_columns = new NamesAndTypesList(table->getColumnsList());
|
NamesAndTypesListPtr required_columns = new NamesAndTypesList(table->getColumnsList());
|
||||||
|
|
||||||
/// Надо убедиться, что запрос идет в таблицу, которая поддерживает вставку.
|
|
||||||
/// TODO Плохо - исправить.
|
|
||||||
table->write(query_ptr);
|
|
||||||
|
|
||||||
/// Создаем кортеж из нескольких стримов, в которые будем писать данные.
|
/// Создаем кортеж из нескольких стримов, в которые будем писать данные.
|
||||||
BlockOutputStreamPtr out{
|
BlockOutputStreamPtr out{
|
||||||
new ProhibitColumnsBlockOutputStream{
|
new ProhibitColumnsBlockOutputStream{
|
||||||
|
@ -96,8 +96,10 @@ Set::Type Set::chooseMethod(const ConstColumnPlainPtrs & key_columns, bool & key
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
bool Set::insertFromBlock(Block & block, bool create_ordered_set)
|
bool Set::insertFromBlock(const Block & block, bool create_ordered_set)
|
||||||
{
|
{
|
||||||
|
Poco::ScopedWriteRWLock lock(rwlock);
|
||||||
|
|
||||||
size_t keys_size = block.columns();
|
size_t keys_size = block.columns();
|
||||||
ConstColumnPlainPtrs key_columns(keys_size);
|
ConstColumnPlainPtrs key_columns(keys_size);
|
||||||
data_types.resize(keys_size);
|
data_types.resize(keys_size);
|
||||||
@ -393,6 +395,8 @@ void Set::execute(Block & block, const ColumnNumbers & arguments, size_t result,
|
|||||||
ColumnUInt8::Container_t & vec_res = c_res->getData();
|
ColumnUInt8::Container_t & vec_res = c_res->getData();
|
||||||
vec_res.resize(block.getByPosition(arguments[0]).column->size());
|
vec_res.resize(block.getByPosition(arguments[0]).column->size());
|
||||||
|
|
||||||
|
Poco::ScopedReadRWLock lock(rwlock);
|
||||||
|
|
||||||
/// Если множество пусто
|
/// Если множество пусто
|
||||||
if (data_types.empty())
|
if (data_types.empty())
|
||||||
{
|
{
|
||||||
|
@ -25,6 +25,7 @@
|
|||||||
#include <DB/Storages/StorageChunkRef.h>
|
#include <DB/Storages/StorageChunkRef.h>
|
||||||
#include <DB/Storages/StorageChunkMerger.h>
|
#include <DB/Storages/StorageChunkMerger.h>
|
||||||
#include <DB/Storages/StorageReplicatedMergeTree.h>
|
#include <DB/Storages/StorageReplicatedMergeTree.h>
|
||||||
|
#include <DB/Storages/StorageSet.h>
|
||||||
|
|
||||||
|
|
||||||
namespace DB
|
namespace DB
|
||||||
@ -175,6 +176,12 @@ StoragePtr StorageFactory::get(
|
|||||||
materialized_columns, alias_columns, column_defaults,
|
materialized_columns, alias_columns, column_defaults,
|
||||||
attach, context.getSettings().max_compress_block_size);
|
attach, context.getSettings().max_compress_block_size);
|
||||||
}
|
}
|
||||||
|
else if (name == "Set")
|
||||||
|
{
|
||||||
|
return StorageSet::create(
|
||||||
|
data_path, table_name, columns,
|
||||||
|
materialized_columns, alias_columns, column_defaults);
|
||||||
|
}
|
||||||
else if (name == "Memory")
|
else if (name == "Memory")
|
||||||
{
|
{
|
||||||
return StorageMemory::create(table_name, columns, materialized_columns, alias_columns, column_defaults);
|
return StorageMemory::create(table_name, columns, materialized_columns, alias_columns, column_defaults);
|
||||||
|
141
dbms/src/Storages/StorageSet.cpp
Normal file
141
dbms/src/Storages/StorageSet.cpp
Normal file
@ -0,0 +1,141 @@
|
|||||||
|
#include <DB/Storages/StorageSet.h>
|
||||||
|
#include <DB/IO/WriteBufferFromFile.h>
|
||||||
|
#include <DB/IO/ReadBufferFromFile.h>
|
||||||
|
#include <DB/IO/CompressedWriteBuffer.h>
|
||||||
|
#include <DB/IO/CompressedReadBuffer.h>
|
||||||
|
#include <DB/DataStreams/NativeBlockOutputStream.h>
|
||||||
|
#include <DB/DataStreams/NativeBlockInputStream.h>
|
||||||
|
#include <DB/Common/escapeForFileName.h>
|
||||||
|
|
||||||
|
|
||||||
|
namespace DB
|
||||||
|
{
|
||||||
|
|
||||||
|
|
||||||
|
class SetBlockOutputStream : public IBlockOutputStream
|
||||||
|
{
|
||||||
|
public:
|
||||||
|
SetBlockOutputStream(SetPtr & set_, const String & backup_path_, const String & backup_tmp_path_, const String & backup_file_name_)
|
||||||
|
: set(set_),
|
||||||
|
backup_path(backup_path_), backup_tmp_path(backup_tmp_path_),
|
||||||
|
backup_file_name(backup_file_name_),
|
||||||
|
backup_buf(backup_tmp_path + backup_file_name),
|
||||||
|
compressed_backup_buf(backup_buf),
|
||||||
|
backup_stream(compressed_backup_buf)
|
||||||
|
{
|
||||||
|
}
|
||||||
|
|
||||||
|
void write(const Block & block) override
|
||||||
|
{
|
||||||
|
set->insertFromBlock(block);
|
||||||
|
backup_stream.write(block);
|
||||||
|
}
|
||||||
|
|
||||||
|
void writeSuffix() override
|
||||||
|
{
|
||||||
|
backup_stream.flush();
|
||||||
|
compressed_backup_buf.next();
|
||||||
|
backup_buf.next();
|
||||||
|
|
||||||
|
Poco::File(backup_tmp_path + backup_file_name).renameTo(backup_path + backup_file_name);
|
||||||
|
}
|
||||||
|
|
||||||
|
private:
|
||||||
|
SetPtr set;
|
||||||
|
String backup_path;
|
||||||
|
String backup_tmp_path;
|
||||||
|
String backup_file_name;
|
||||||
|
WriteBufferFromFile backup_buf;
|
||||||
|
CompressedWriteBuffer compressed_backup_buf;
|
||||||
|
NativeBlockOutputStream backup_stream;
|
||||||
|
};
|
||||||
|
|
||||||
|
|
||||||
|
BlockOutputStreamPtr StorageSet::write(ASTPtr query)
|
||||||
|
{
|
||||||
|
++increment;
|
||||||
|
return new SetBlockOutputStream(set, path, path + "tmp/", toString(increment) + ".bin");
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
StorageSet::StorageSet(
|
||||||
|
const String & path_,
|
||||||
|
const String & name_,
|
||||||
|
NamesAndTypesListPtr columns_,
|
||||||
|
const NamesAndTypesList & materialized_columns_,
|
||||||
|
const NamesAndTypesList & alias_columns_,
|
||||||
|
const ColumnDefaults & column_defaults_)
|
||||||
|
: IStorage{materialized_columns_, alias_columns_, column_defaults_},
|
||||||
|
path(path_ + escapeForFileName(name_) + '/'), name(name_), columns(columns_)
|
||||||
|
{
|
||||||
|
restore();
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
void StorageSet::restore()
|
||||||
|
{
|
||||||
|
Poco::File tmp_dir(path + "tmp/");
|
||||||
|
if (!tmp_dir.exists())
|
||||||
|
{
|
||||||
|
tmp_dir.createDirectories();
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
constexpr auto file_suffix = ".bin";
|
||||||
|
constexpr auto file_suffix_size = strlen(file_suffix);
|
||||||
|
|
||||||
|
DataTypeFactory data_type_factory;
|
||||||
|
|
||||||
|
Poco::DirectoryIterator dir_end;
|
||||||
|
for (Poco::DirectoryIterator dir_it(path); dir_end != dir_it; ++dir_it)
|
||||||
|
{
|
||||||
|
const auto & name = dir_it.name();
|
||||||
|
|
||||||
|
if (dir_it->isFile()
|
||||||
|
&& name.size() > file_suffix_size
|
||||||
|
&& 0 == name.compare(name.size() - file_suffix_size, file_suffix_size, file_suffix)
|
||||||
|
&& dir_it->getSize() > 0)
|
||||||
|
{
|
||||||
|
/// Вычисляем максимальный номер имеющихся файлов с бэкапом, чтобы добавлять следующие файлы с большими номерами.
|
||||||
|
UInt64 file_num = parse<UInt64>(name.substr(0, name.size() - file_suffix_size));
|
||||||
|
if (file_num > increment)
|
||||||
|
increment = file_num;
|
||||||
|
|
||||||
|
restoreFromFile(dir_it->path(), data_type_factory);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
void StorageSet::restoreFromFile(const String & file_path, const DataTypeFactory & data_type_factory)
|
||||||
|
{
|
||||||
|
ReadBufferFromFile backup_buf(file_path);
|
||||||
|
CompressedReadBuffer compressed_backup_buf(backup_buf);
|
||||||
|
NativeBlockInputStream backup_stream(compressed_backup_buf, data_type_factory);
|
||||||
|
|
||||||
|
backup_stream.readPrefix();
|
||||||
|
while (Block block = backup_stream.read())
|
||||||
|
set->insertFromBlock(block);
|
||||||
|
backup_stream.readSuffix();
|
||||||
|
|
||||||
|
/// TODO Добавить скорость, сжатые байты, объём данных в памяти, коэффициент сжатия... Обобщить всё логгирование статистики в проекте.
|
||||||
|
LOG_INFO(&Logger::get("StorageSet"), std::fixed << std::setprecision(2)
|
||||||
|
<< "Loaded from backup file " << file_path << ". "
|
||||||
|
<< backup_stream.getInfo().rows << " rows, "
|
||||||
|
<< backup_stream.getInfo().bytes / 1048576.0 << " MiB. "
|
||||||
|
<< "Set has " << set->getTotalRowCount() << " unique rows.");
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
void StorageSet::rename(const String & new_path_to_db, const String & new_database_name, const String & new_table_name)
|
||||||
|
{
|
||||||
|
/// Переименовываем директорию с данными.
|
||||||
|
String new_path = new_path_to_db + escapeForFileName(new_table_name);
|
||||||
|
Poco::File(path).renameTo(new_path);
|
||||||
|
|
||||||
|
path = new_path + "/";
|
||||||
|
name = new_table_name;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
}
|
21
dbms/tests/queries/0_stateless/00116_storage_set.reference
Normal file
21
dbms/tests/queries/0_stateless/00116_storage_set.reference
Normal file
@ -0,0 +1,21 @@
|
|||||||
|
Hello
|
||||||
|
test
|
||||||
|
World
|
||||||
|
world
|
||||||
|
abc
|
||||||
|
xyz
|
||||||
|
Hello
|
||||||
|
World
|
||||||
|
Hello
|
||||||
|
World
|
||||||
|
Hello
|
||||||
|
World
|
||||||
|
Hello
|
||||||
|
World
|
||||||
|
abc
|
||||||
|
Hello
|
||||||
|
World
|
||||||
|
abc
|
||||||
|
Hello
|
||||||
|
World
|
||||||
|
abc
|
33
dbms/tests/queries/0_stateless/00116_storage_set.sql
Normal file
33
dbms/tests/queries/0_stateless/00116_storage_set.sql
Normal file
@ -0,0 +1,33 @@
|
|||||||
|
DROP TABLE IF EXISTS test.set;
|
||||||
|
DROP TABLE IF EXISTS test.set2;
|
||||||
|
|
||||||
|
CREATE TABLE test.set (x String) ENGINE = Set;
|
||||||
|
|
||||||
|
USE test;
|
||||||
|
|
||||||
|
SELECT arrayJoin(['Hello', 'test', 'World', 'world', 'abc', 'xyz']) AS s WHERE s IN set;
|
||||||
|
SELECT arrayJoin(['Hello', 'test', 'World', 'world', 'abc', 'xyz']) AS s WHERE s NOT IN set;
|
||||||
|
|
||||||
|
INSERT INTO set VALUES ('Hello'), ('World');
|
||||||
|
SELECT arrayJoin(['Hello', 'test', 'World', 'world', 'abc', 'xyz']) AS s WHERE s IN set;
|
||||||
|
|
||||||
|
RENAME TABLE set TO set2;
|
||||||
|
SELECT arrayJoin(['Hello', 'test', 'World', 'world', 'abc', 'xyz']) AS s WHERE s IN set2;
|
||||||
|
|
||||||
|
INSERT INTO test.set2 VALUES ('Hello'), ('World');
|
||||||
|
SELECT arrayJoin(['Hello', 'test', 'World', 'world', 'abc', 'xyz']) AS s WHERE s IN set2;
|
||||||
|
|
||||||
|
INSERT INTO test.set2 VALUES ('abc'), ('World');
|
||||||
|
SELECT arrayJoin(['Hello', 'test', 'World', 'world', 'abc', 'xyz']) AS s WHERE s IN set2;
|
||||||
|
|
||||||
|
DETACH TABLE set2;
|
||||||
|
ATTACH TABLE set2 (x String) ENGINE = Set;
|
||||||
|
|
||||||
|
SELECT arrayJoin(['Hello', 'test', 'World', 'world', 'abc', 'xyz']) AS s WHERE s IN set2;
|
||||||
|
|
||||||
|
RENAME TABLE set2 TO set;
|
||||||
|
SELECT arrayJoin(['Hello', 'test', 'World', 'world', 'abc', 'xyz']) AS s WHERE s IN set;
|
||||||
|
|
||||||
|
USE default;
|
||||||
|
|
||||||
|
DROP TABLE test.set;
|
Loading…
Reference in New Issue
Block a user