diff --git a/dbms/include/DB/DataStreams/MaterializingBlockOutputStream.h b/dbms/include/DB/DataStreams/MaterializingBlockOutputStream.h index 641a522373a..b3f4a78ad30 100644 --- a/dbms/include/DB/DataStreams/MaterializingBlockOutputStream.h +++ b/dbms/include/DB/DataStreams/MaterializingBlockOutputStream.h @@ -13,8 +13,8 @@ namespace DB class MaterializingBlockOutputStream : public IBlockOutputStream { public: - MaterializingBlockOutputStream(const BlockOutputStreamPtr & output) : output{output} - {} + MaterializingBlockOutputStream(const BlockOutputStreamPtr & output) + : output{output} {} void write(const Block & original_block) override { @@ -31,6 +31,12 @@ public: output->write(block); } + void flush() override { output->flush(); } + + void writePrefix() override { output->writePrefix(); } + void writeSuffix() override { output->writeSuffix(); } + +private: BlockOutputStreamPtr output; }; diff --git a/dbms/include/DB/DataStreams/PushingToViewsBlockOutputStream.h b/dbms/include/DB/DataStreams/PushingToViewsBlockOutputStream.h index a442f477aa2..0c1f12bd3b4 100644 --- a/dbms/include/DB/DataStreams/PushingToViewsBlockOutputStream.h +++ b/dbms/include/DB/DataStreams/PushingToViewsBlockOutputStream.h @@ -49,6 +49,12 @@ public: output->write(block); } + void flush() override + { + if (output) + output->flush(); + } + void writePrefix() override { if (output) diff --git a/dbms/include/DB/Interpreters/Join.h b/dbms/include/DB/Interpreters/Join.h index 47901cef988..a436bfa31cc 100644 --- a/dbms/include/DB/Interpreters/Join.h +++ b/dbms/include/DB/Interpreters/Join.h @@ -78,7 +78,10 @@ public: */ void joinBlock(Block & block); - size_t size() const { return getTotalRowCount(); } + /// Считает суммарное число ключей во всех Join'ах + size_t getTotalRowCount() const; + /// Считает суммарный размер в байтах буфферов всех Join'ов + размер string_pool'а + size_t getTotalByteCount() const; /// Ссылка на строку в блоке. @@ -176,11 +179,6 @@ private: /// Проверить не превышены ли допустимые размеры множества bool checkSizeLimits() const; - - /// Считает суммарное число ключей во всех Join'ах - size_t getTotalRowCount() const; - /// Считает суммарный размер в байтах буфферов всех Join'ов + размер string_pool'а - size_t getTotalByteCount() const; }; typedef Poco::SharedPtr JoinPtr; diff --git a/dbms/include/DB/Interpreters/Set.h b/dbms/include/DB/Interpreters/Set.h index 1c8024f490f..246c6107601 100644 --- a/dbms/include/DB/Interpreters/Set.h +++ b/dbms/include/DB/Interpreters/Set.h @@ -47,9 +47,12 @@ public: void createFromAST(DataTypes & types, ASTPtr node, bool create_ordered_set); // Возвращает false, если превышено какое-нибудь ограничение, и больше не нужно вставлять. - bool insertFromBlock(Block & block, bool create_ordered_set = false); + bool insertFromBlock(const Block & block, bool create_ordered_set = false); - size_t size() const { return getTotalRowCount(); } + /// Считает суммарное число ключей во всех Set'ах + size_t getTotalRowCount() const; + /// Считает суммарный размер в байтах буфферов всех Set'ов + размер string_pool'а + size_t getTotalByteCount() const; /** Для указанных столбцов блока проверить принадлежность их значений множеству. * Записать результат в столбец в позиции result. @@ -152,16 +155,18 @@ private: /// Проверить не превышены ли допустимые размеры множества ключей bool checkSetSizeLimits() const; - /// Считает суммарное число ключей во всех Set'ах - size_t getTotalRowCount() const; - /// Считает суммарный размер в байтах буфферов всех Set'ов + размер string_pool'а - size_t getTotalByteCount() const; - /// вектор упорядоченных элементов Set /// нужен для работы индекса по первичному ключу в секции In typedef std::vector OrderedSetElements; typedef std::unique_ptr OrderedSetElementsPtr; OrderedSetElementsPtr ordered_set_elements; + + /** Защищает работу с множеством в функциях insertFromBlock и execute. + * Эти функции могут вызываться одновременно из разных потоков только при использовании StorageSet, + * и StorageSet вызывает только эти две функции. + * Поэтому остальные функции по работе с множеством, не защинены. + */ + mutable Poco::RWLock rwlock; }; typedef Poco::SharedPtr SetPtr; diff --git a/dbms/include/DB/Storages/StorageSet.h b/dbms/include/DB/Storages/StorageSet.h new file mode 100644 index 00000000000..f590d520f5c --- /dev/null +++ b/dbms/include/DB/Storages/StorageSet.h @@ -0,0 +1,64 @@ +#pragma once + +#include +#include + + +namespace DB +{ + +/** Позволяет сохранить множество для последующего использования в правой части оператора IN. + * При вставке в таблицу, данные будут вставлены в множество, + * а также записаны в файл-бэкап, для восстановления после перезапуска. + * Чтение из таблицы напрямую невозможно - возможно лишь указание в правой части оператора IN. + */ +class StorageSet : public IStorage +{ +public: + static StoragePtr create( + const String & path_, + const String & name_, + NamesAndTypesListPtr columns_, + const NamesAndTypesList & materialized_columns_, + const NamesAndTypesList & alias_columns_, + const ColumnDefaults & column_defaults_) + { + return (new StorageSet{ + path_, name_, columns_, + materialized_columns_, alias_columns_, column_defaults_})->thisPtr(); + } + + String getName() const override { return "Set"; } + String getTableName() const override { return name; } + + const NamesAndTypesList & getColumnsListImpl() const override { return *columns; } + + BlockOutputStreamPtr write(ASTPtr query) override; + + void rename(const String & new_path_to_db, const String & new_database_name, const String & new_table_name) override; + + /// Получить доступ к внутренностям. + SetPtr & getSet() { return set; } + +private: + String path; + String name; + NamesAndTypesListPtr columns; + + UInt64 increment = 0; /// Для имён файлов бэкапа. + SetPtr set { new Set{Limits{}} }; + + StorageSet( + const String & path_, + const String & name_, + NamesAndTypesListPtr columns_, + const NamesAndTypesList & materialized_columns_, + const NamesAndTypesList & alias_columns_, + const ColumnDefaults & column_defaults_); + + /// Восстановление из бэкапа. + void restore(); + void restoreFromFile(const String & file_path, const DataTypeFactory & data_type_factory); +}; + +} diff --git a/dbms/src/DataStreams/CreatingSetsBlockInputStream.cpp b/dbms/src/DataStreams/CreatingSetsBlockInputStream.cpp index 32cd63c27c3..c65aa4cfd72 100644 --- a/dbms/src/DataStreams/CreatingSetsBlockInputStream.cpp +++ b/dbms/src/DataStreams/CreatingSetsBlockInputStream.cpp @@ -126,9 +126,9 @@ void CreatingSetsBlockInputStream::create(SubqueryForSet & subquery) msg << "Created. "; if (subquery.set) - msg << "Set with " << subquery.set->size() << " entries from " << head_rows << " rows. "; + msg << "Set with " << subquery.set->getTotalRowCount() << " entries from " << head_rows << " rows. "; if (subquery.join) - msg << "Join with " << subquery.join->size() << " entries from " << head_rows << " rows. "; + msg << "Join with " << subquery.join->getTotalRowCount() << " entries from " << head_rows << " rows. "; if (subquery.table) msg << "Table with " << head_rows << " rows. "; diff --git a/dbms/src/Interpreters/ExpressionAnalyzer.cpp b/dbms/src/Interpreters/ExpressionAnalyzer.cpp index b1ccc9a9f0a..f8f308c3baa 100644 --- a/dbms/src/Interpreters/ExpressionAnalyzer.cpp +++ b/dbms/src/Interpreters/ExpressionAnalyzer.cpp @@ -26,6 +26,7 @@ #include #include #include +#include #include #include @@ -675,16 +676,37 @@ void ExpressionAnalyzer::makeSet(ASTFunction * node, const Block & sample_block) return; /// Если подзапрос или имя таблицы для SELECT. - if (typeid_cast(&*arg) || typeid_cast(&*arg)) + ASTIdentifier * identifier = typeid_cast(&*arg); + if (typeid_cast(&*arg) || identifier) { /// Получаем поток блоков для подзапроса. Создаём Set и кладём на место подзапроса. String set_id = arg->getColumnName(); ASTSet * ast_set = new ASTSet(set_id); ASTPtr ast_set_ptr = ast_set; + /// Особый случай - если справа оператора IN указано имя таблицы, при чём, таблица имеет тип Set (заранее подготовленное множество). + /// TODO В этом синтаксисе не поддерживается указание имени БД. + if (identifier) + { + StoragePtr table = context.tryGetTable("", identifier->name); + + if (table) + { + StorageSet * storage_set = typeid_cast(table.get()); + + if (storage_set) + { + SetPtr & set = storage_set->getSet(); + ast_set->set = set; + arg = ast_set_ptr; + return; + } + } + } + SubqueryForSet & subquery_for_set = subqueries_for_sets[set_id]; - /// Если уже создали Set с таким же подзапросом. + /// Если уже создали Set с таким же подзапросом/таблицей. if (subquery_for_set.set) { ast_set->set = subquery_for_set.set; diff --git a/dbms/src/Interpreters/InterpreterInsertQuery.cpp b/dbms/src/Interpreters/InterpreterInsertQuery.cpp index 5bb8aea723b..778a3331c08 100644 --- a/dbms/src/Interpreters/InterpreterInsertQuery.cpp +++ b/dbms/src/Interpreters/InterpreterInsertQuery.cpp @@ -74,10 +74,6 @@ void InterpreterInsertQuery::execute(ReadBuffer * remaining_data_istr) * then we compose the same list from the resulting block */ NamesAndTypesListPtr required_columns = new NamesAndTypesList(table->getColumnsList()); - /// Надо убедиться, что запрос идет в таблицу, которая поддерживает вставку. - /// TODO Плохо - исправить. - table->write(query_ptr); - /// Создаем кортеж из нескольких стримов, в которые будем писать данные. BlockOutputStreamPtr out{ new ProhibitColumnsBlockOutputStream{ @@ -141,10 +137,6 @@ BlockIO InterpreterInsertQuery::execute() NamesAndTypesListPtr required_columns = new NamesAndTypesList(table->getColumnsList()); - /// Надо убедиться, что запрос идет в таблицу, которая поддерживает вставку. - /// TODO Плохо - исправить. - table->write(query_ptr); - /// Создаем кортеж из нескольких стримов, в которые будем писать данные. BlockOutputStreamPtr out{ new ProhibitColumnsBlockOutputStream{ diff --git a/dbms/src/Interpreters/Set.cpp b/dbms/src/Interpreters/Set.cpp index 62391e7b9e9..a81dd590fa1 100644 --- a/dbms/src/Interpreters/Set.cpp +++ b/dbms/src/Interpreters/Set.cpp @@ -96,8 +96,10 @@ Set::Type Set::chooseMethod(const ConstColumnPlainPtrs & key_columns, bool & key } -bool Set::insertFromBlock(Block & block, bool create_ordered_set) +bool Set::insertFromBlock(const Block & block, bool create_ordered_set) { + Poco::ScopedWriteRWLock lock(rwlock); + size_t keys_size = block.columns(); ConstColumnPlainPtrs key_columns(keys_size); data_types.resize(keys_size); @@ -393,6 +395,8 @@ void Set::execute(Block & block, const ColumnNumbers & arguments, size_t result, ColumnUInt8::Container_t & vec_res = c_res->getData(); vec_res.resize(block.getByPosition(arguments[0]).column->size()); + Poco::ScopedReadRWLock lock(rwlock); + /// Если множество пусто if (data_types.empty()) { diff --git a/dbms/src/Storages/StorageFactory.cpp b/dbms/src/Storages/StorageFactory.cpp index 1f953d7fd3f..0bcf755a3c8 100644 --- a/dbms/src/Storages/StorageFactory.cpp +++ b/dbms/src/Storages/StorageFactory.cpp @@ -25,6 +25,7 @@ #include #include #include +#include namespace DB @@ -175,6 +176,12 @@ StoragePtr StorageFactory::get( materialized_columns, alias_columns, column_defaults, attach, context.getSettings().max_compress_block_size); } + else if (name == "Set") + { + return StorageSet::create( + data_path, table_name, columns, + materialized_columns, alias_columns, column_defaults); + } else if (name == "Memory") { return StorageMemory::create(table_name, columns, materialized_columns, alias_columns, column_defaults); diff --git a/dbms/src/Storages/StorageSet.cpp b/dbms/src/Storages/StorageSet.cpp new file mode 100644 index 00000000000..c87a503686c --- /dev/null +++ b/dbms/src/Storages/StorageSet.cpp @@ -0,0 +1,141 @@ +#include +#include +#include +#include +#include +#include +#include +#include + + +namespace DB +{ + + +class SetBlockOutputStream : public IBlockOutputStream +{ +public: + SetBlockOutputStream(SetPtr & set_, const String & backup_path_, const String & backup_tmp_path_, const String & backup_file_name_) + : set(set_), + backup_path(backup_path_), backup_tmp_path(backup_tmp_path_), + backup_file_name(backup_file_name_), + backup_buf(backup_tmp_path + backup_file_name), + compressed_backup_buf(backup_buf), + backup_stream(compressed_backup_buf) + { + } + + void write(const Block & block) override + { + set->insertFromBlock(block); + backup_stream.write(block); + } + + void writeSuffix() override + { + backup_stream.flush(); + compressed_backup_buf.next(); + backup_buf.next(); + + Poco::File(backup_tmp_path + backup_file_name).renameTo(backup_path + backup_file_name); + } + +private: + SetPtr set; + String backup_path; + String backup_tmp_path; + String backup_file_name; + WriteBufferFromFile backup_buf; + CompressedWriteBuffer compressed_backup_buf; + NativeBlockOutputStream backup_stream; +}; + + +BlockOutputStreamPtr StorageSet::write(ASTPtr query) +{ + ++increment; + return new SetBlockOutputStream(set, path, path + "tmp/", toString(increment) + ".bin"); +} + + +StorageSet::StorageSet( + const String & path_, + const String & name_, + NamesAndTypesListPtr columns_, + const NamesAndTypesList & materialized_columns_, + const NamesAndTypesList & alias_columns_, + const ColumnDefaults & column_defaults_) + : IStorage{materialized_columns_, alias_columns_, column_defaults_}, + path(path_ + escapeForFileName(name_) + '/'), name(name_), columns(columns_) +{ + restore(); +} + + +void StorageSet::restore() +{ + Poco::File tmp_dir(path + "tmp/"); + if (!tmp_dir.exists()) + { + tmp_dir.createDirectories(); + return; + } + + constexpr auto file_suffix = ".bin"; + constexpr auto file_suffix_size = strlen(file_suffix); + + DataTypeFactory data_type_factory; + + Poco::DirectoryIterator dir_end; + for (Poco::DirectoryIterator dir_it(path); dir_end != dir_it; ++dir_it) + { + const auto & name = dir_it.name(); + + if (dir_it->isFile() + && name.size() > file_suffix_size + && 0 == name.compare(name.size() - file_suffix_size, file_suffix_size, file_suffix) + && dir_it->getSize() > 0) + { + /// Вычисляем максимальный номер имеющихся файлов с бэкапом, чтобы добавлять следующие файлы с большими номерами. + UInt64 file_num = parse(name.substr(0, name.size() - file_suffix_size)); + if (file_num > increment) + increment = file_num; + + restoreFromFile(dir_it->path(), data_type_factory); + } + } +} + + +void StorageSet::restoreFromFile(const String & file_path, const DataTypeFactory & data_type_factory) +{ + ReadBufferFromFile backup_buf(file_path); + CompressedReadBuffer compressed_backup_buf(backup_buf); + NativeBlockInputStream backup_stream(compressed_backup_buf, data_type_factory); + + backup_stream.readPrefix(); + while (Block block = backup_stream.read()) + set->insertFromBlock(block); + backup_stream.readSuffix(); + + /// TODO Добавить скорость, сжатые байты, объём данных в памяти, коэффициент сжатия... Обобщить всё логгирование статистики в проекте. + LOG_INFO(&Logger::get("StorageSet"), std::fixed << std::setprecision(2) + << "Loaded from backup file " << file_path << ". " + << backup_stream.getInfo().rows << " rows, " + << backup_stream.getInfo().bytes / 1048576.0 << " MiB. " + << "Set has " << set->getTotalRowCount() << " unique rows."); +} + + +void StorageSet::rename(const String & new_path_to_db, const String & new_database_name, const String & new_table_name) +{ + /// Переименовываем директорию с данными. + String new_path = new_path_to_db + escapeForFileName(new_table_name); + Poco::File(path).renameTo(new_path); + + path = new_path + "/"; + name = new_table_name; +} + + +} diff --git a/dbms/tests/queries/0_stateless/00116_storage_set.reference b/dbms/tests/queries/0_stateless/00116_storage_set.reference new file mode 100644 index 00000000000..01bd24ebe17 --- /dev/null +++ b/dbms/tests/queries/0_stateless/00116_storage_set.reference @@ -0,0 +1,21 @@ +Hello +test +World +world +abc +xyz +Hello +World +Hello +World +Hello +World +Hello +World +abc +Hello +World +abc +Hello +World +abc diff --git a/dbms/tests/queries/0_stateless/00116_storage_set.sql b/dbms/tests/queries/0_stateless/00116_storage_set.sql new file mode 100644 index 00000000000..49ba086c41d --- /dev/null +++ b/dbms/tests/queries/0_stateless/00116_storage_set.sql @@ -0,0 +1,33 @@ +DROP TABLE IF EXISTS test.set; +DROP TABLE IF EXISTS test.set2; + +CREATE TABLE test.set (x String) ENGINE = Set; + +USE test; + +SELECT arrayJoin(['Hello', 'test', 'World', 'world', 'abc', 'xyz']) AS s WHERE s IN set; +SELECT arrayJoin(['Hello', 'test', 'World', 'world', 'abc', 'xyz']) AS s WHERE s NOT IN set; + +INSERT INTO set VALUES ('Hello'), ('World'); +SELECT arrayJoin(['Hello', 'test', 'World', 'world', 'abc', 'xyz']) AS s WHERE s IN set; + +RENAME TABLE set TO set2; +SELECT arrayJoin(['Hello', 'test', 'World', 'world', 'abc', 'xyz']) AS s WHERE s IN set2; + +INSERT INTO test.set2 VALUES ('Hello'), ('World'); +SELECT arrayJoin(['Hello', 'test', 'World', 'world', 'abc', 'xyz']) AS s WHERE s IN set2; + +INSERT INTO test.set2 VALUES ('abc'), ('World'); +SELECT arrayJoin(['Hello', 'test', 'World', 'world', 'abc', 'xyz']) AS s WHERE s IN set2; + +DETACH TABLE set2; +ATTACH TABLE set2 (x String) ENGINE = Set; + +SELECT arrayJoin(['Hello', 'test', 'World', 'world', 'abc', 'xyz']) AS s WHERE s IN set2; + +RENAME TABLE set2 TO set; +SELECT arrayJoin(['Hello', 'test', 'World', 'world', 'abc', 'xyz']) AS s WHERE s IN set; + +USE default; + +DROP TABLE test.set;