ClickHouse/dbms/src/Storages/StorageSet.cpp

221 lines
6.5 KiB
C++
Raw Normal View History

#include <Storages/StorageSet.h>
#include <Storages/StorageFactory.h>
#include <IO/ReadBufferFromFile.h>
#include <IO/CompressedReadBuffer.h>
#include <IO/WriteBufferFromFile.h>
#include <IO/CompressedWriteBuffer.h>
#include <DataStreams/NativeBlockOutputStream.h>
#include <DataStreams/NativeBlockInputStream.h>
#include <Common/escapeForFileName.h>
#include <Common/StringUtils/StringUtils.h>
#include <Interpreters/Set.h>
2015-04-16 06:12:35 +00:00
#include <Poco/DirectoryIterator.h>
2015-01-27 00:52:03 +00:00
namespace DB
{
namespace ErrorCodes
{
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
}
2015-01-27 00:52:03 +00:00
namespace ErrorCodes
{
extern const int INCORRECT_FILE_NAME;
}
2016-10-25 06:49:24 +00:00
class SetOrJoinBlockOutputStream : public IBlockOutputStream
{
public:
SetOrJoinBlockOutputStream(StorageSetOrJoinBase & table_,
const String & backup_path_, const String & backup_tmp_path_, const String & backup_file_name_);
2016-10-25 06:49:24 +00:00
Block getHeader() const override { return table.getSampleBlock(); }
void write(const Block & block) override;
void writeSuffix() override;
2016-10-25 06:49:24 +00:00
private:
StorageSetOrJoinBase & table;
String backup_path;
String backup_tmp_path;
String backup_file_name;
WriteBufferFromFile backup_buf;
CompressedWriteBuffer compressed_backup_buf;
NativeBlockOutputStream backup_stream;
2016-10-25 06:49:24 +00:00
};
SetOrJoinBlockOutputStream::SetOrJoinBlockOutputStream(StorageSetOrJoinBase & table_,
const String & backup_path_, const String & backup_tmp_path_, const String & backup_file_name_)
: table(table_),
backup_path(backup_path_), backup_tmp_path(backup_tmp_path_),
backup_file_name(backup_file_name_),
backup_buf(backup_tmp_path + backup_file_name),
compressed_backup_buf(backup_buf),
backup_stream(compressed_backup_buf, 0, table.getSampleBlock())
2015-01-27 00:52:03 +00:00
{
}
2015-01-27 00:52:03 +00:00
void SetOrJoinBlockOutputStream::write(const Block & block)
{
/// Sort columns in the block. This is necessary, since Set and Join count on the same column order in different blocks.
Block sorted_block = block.sortColumns();
table.insertBlock(sorted_block);
backup_stream.write(sorted_block);
}
2015-01-27 00:52:03 +00:00
void SetOrJoinBlockOutputStream::writeSuffix()
{
backup_stream.flush();
compressed_backup_buf.next();
backup_buf.next();
2015-01-27 00:52:03 +00:00
Poco::File(backup_tmp_path + backup_file_name).renameTo(backup_path + backup_file_name);
}
2015-01-27 00:52:03 +00:00
2017-12-01 21:13:25 +00:00
BlockOutputStreamPtr StorageSetOrJoinBase::write(const ASTPtr & /*query*/, const Settings & /*settings*/)
2015-01-27 00:52:03 +00:00
{
++increment;
return std::make_shared<SetOrJoinBlockOutputStream>(*this, path, path + "tmp/", toString(increment) + ".bin");
2015-01-27 00:52:03 +00:00
}
StorageSetOrJoinBase::StorageSetOrJoinBase(
const String & path_,
const String & table_name_,
const ColumnsDescription & columns_)
: IStorage{columns_}, table_name(table_name_)
{
if (path_.empty())
throw Exception("Join and Set storages require data path", ErrorCodes::INCORRECT_FILE_NAME);
path = path_ + escapeForFileName(table_name_) + '/';
}
StorageSet::StorageSet(
const String & path_,
const String & name_,
const ColumnsDescription & columns_)
: StorageSetOrJoinBase{path_, name_, columns_},
set(std::make_shared<Set>(SizeLimits()))
2015-01-27 00:52:03 +00:00
{
2018-04-19 21:34:04 +00:00
Block header = getSampleBlock();
header = header.sortColumns();
set->setHeader(header);
restore();
2015-01-27 00:52:03 +00:00
}
2018-02-08 14:15:21 +00:00
void StorageSet::insertBlock(const Block & block) { set->insertFromBlock(block, /*fill_set_elements=*/false); }
2018-04-21 00:35:20 +00:00
size_t StorageSet::getSize() const { return set->getTotalRowCount(); }
2018-06-09 15:48:22 +00:00
void StorageSet::truncate(const ASTPtr &)
2018-04-21 00:35:20 +00:00
{
Block header = getSampleBlock();
header = header.sortColumns();
2018-06-09 15:48:22 +00:00
2018-04-21 00:35:20 +00:00
increment = 0;
set = std::make_shared<Set>(SizeLimits());
set->setHeader(header);
2018-04-21 00:35:20 +00:00
static const auto file_suffix = ".bin";
Poco::DirectoryIterator dir_end;
for (Poco::DirectoryIterator dir_it(path); dir_end != dir_it; ++dir_it)
{
const auto & name = dir_it.name();
if (dir_it->isFile() && endsWith(name, file_suffix) && dir_it->getSize() > 0)
dir_it->remove(false);
}
};
Squashed commit of the following: commit e712f469a55ff34ad34b482b15cc4153b7ad7233 Author: Alexey Milovidov <milovidov@yandex-team.ru> Date: Sat Jan 14 11:59:13 2017 +0300 Less dependencies [#CLICKHOUSE-2] commit 2a002823084e3a79bffcc17d479620a68eb0644b Author: Alexey Milovidov <milovidov@yandex-team.ru> Date: Sat Jan 14 11:58:30 2017 +0300 Less dependencies [#CLICKHOUSE-2] commit 9e06f407c8ee781ed8ddf98bdfcc31846bf2a0fe Author: Alexey Milovidov <milovidov@yandex-team.ru> Date: Sat Jan 14 11:55:14 2017 +0300 Less dependencies [#CLICKHOUSE-2] commit 9581620f1e839f456fa7894aa1f996d5162ac6cd Author: Alexey Milovidov <milovidov@yandex-team.ru> Date: Sat Jan 14 11:54:22 2017 +0300 Less dependencies [#CLICKHOUSE-2] commit 2a8564c68cb6cc3649fafaf401256d43c9a2e777 Author: Alexey Milovidov <milovidov@yandex-team.ru> Date: Sat Jan 14 11:47:34 2017 +0300 Less dependencies [#CLICKHOUSE-2] commit cf60632d78ec656be3304ef4565e859bb6ce80ba Author: Alexey Milovidov <milovidov@yandex-team.ru> Date: Sat Jan 14 11:40:09 2017 +0300 Less dependencies [#CLICKHOUSE-2] commit ee3d1dc6e0c4ca60e3ac1e0c30d4b3ed1e66eca0 Author: Alexey Milovidov <milovidov@yandex-team.ru> Date: Sat Jan 14 11:22:49 2017 +0300 Less dependencies [#CLICKHOUSE-2] commit 65592ef7116a90104fcd524b53ef8b7cf22640f2 Author: Alexey Milovidov <milovidov@yandex-team.ru> Date: Sat Jan 14 11:18:17 2017 +0300 Less dependencies [#CLICKHOUSE-2] commit 37972c257320d3b7e7b294e0fdeffff218647bfd Author: Alexey Milovidov <milovidov@yandex-team.ru> Date: Sat Jan 14 11:17:06 2017 +0300 Less dependencies [#CLICKHOUSE-2] commit dd909d149974ce5bed2456de1261aa5a368fd3ff Author: Alexey Milovidov <milovidov@yandex-team.ru> Date: Sat Jan 14 11:16:28 2017 +0300 Less dependencies [#CLICKHOUSE-2] commit 3cf43266ca7e30adf01212b1a739ba5fe43639fd Author: Alexey Milovidov <milovidov@yandex-team.ru> Date: Sat Jan 14 11:15:42 2017 +0300 Less dependencies [#CLICKHOUSE-2] commit 6731a3df96d1609286e2536b6432916af7743f0f Author: Alexey Milovidov <milovidov@yandex-team.ru> Date: Sat Jan 14 11:13:35 2017 +0300 Less dependencies [#CLICKHOUSE-2] commit 1b5727e0d56415b7add4cb76110105358663602c Author: Alexey Milovidov <milovidov@yandex-team.ru> Date: Sat Jan 14 11:11:18 2017 +0300 Less dependencies [#CLICKHOUSE-2] commit bbcf726a55685b8e72f5b40ba0bf1904bd1c0407 Author: Alexey Milovidov <milovidov@yandex-team.ru> Date: Sat Jan 14 11:09:04 2017 +0300 Less dependencies [#CLICKHOUSE-2] commit c03b477d5e2e65014e8906ecfa2efb67ee295af1 Author: Alexey Milovidov <milovidov@yandex-team.ru> Date: Sat Jan 14 11:06:30 2017 +0300 Less dependencies [#CLICKHOUSE-2] commit 2986e2fb0466bc18d73693dcdded28fccc0dc66b Author: Alexey Milovidov <milovidov@yandex-team.ru> Date: Sat Jan 14 11:05:44 2017 +0300 Less dependencies [#CLICKHOUSE-2] commit 5d6cdef13d2e02bd5c4954983334e9162ab2635b Author: Alexey Milovidov <milovidov@yandex-team.ru> Date: Sat Jan 14 11:04:53 2017 +0300 Less dependencies [#CLICKHOUSE-2] commit f2b819b25ce8b2ccdcb201eefb03e1e6f5aab590 Author: Alexey Milovidov <milovidov@yandex-team.ru> Date: Sat Jan 14 11:01:47 2017 +0300 Less dependencies [#CLICKHOUSE-2]
2017-01-14 09:00:19 +00:00
void StorageSetOrJoinBase::restore()
2015-01-27 00:52:03 +00:00
{
Poco::File tmp_dir(path + "tmp/");
if (!tmp_dir.exists())
{
tmp_dir.createDirectories();
return;
}
static const auto file_suffix = ".bin";
static const auto file_suffix_size = strlen(".bin");
Poco::DirectoryIterator dir_end;
for (Poco::DirectoryIterator dir_it(path); dir_end != dir_it; ++dir_it)
{
const auto & name = dir_it.name();
if (dir_it->isFile()
&& endsWith(name, file_suffix)
&& dir_it->getSize() > 0)
{
/// Calculate the maximum number of available files with a backup to add the following files with large numbers.
UInt64 file_num = parse<UInt64>(name.substr(0, name.size() - file_suffix_size));
if (file_num > increment)
increment = file_num;
restoreFromFile(dir_it->path());
}
}
2015-01-27 00:52:03 +00:00
}
void StorageSetOrJoinBase::restoreFromFile(const String & file_path)
2015-01-27 00:52:03 +00:00
{
ReadBufferFromFile backup_buf(file_path);
CompressedReadBuffer compressed_backup_buf(backup_buf);
NativeBlockInputStream backup_stream(compressed_backup_buf, 0);
backup_stream.readPrefix();
while (Block block = backup_stream.read())
insertBlock(block);
backup_stream.readSuffix();
/// TODO Add speed, compressed bytes, data volume in memory, compression ratio ... Generalize all statistics logging in project.
LOG_INFO(&Logger::get("StorageSetOrJoinBase"), std::fixed << std::setprecision(2)
<< "Loaded from backup file " << file_path << ". "
<< backup_stream.getProfileInfo().rows << " rows, "
<< backup_stream.getProfileInfo().bytes / 1048576.0 << " MiB. "
<< "State has " << getSize() << " unique rows.");
2015-01-27 00:52:03 +00:00
}
2017-12-01 21:13:25 +00:00
void StorageSetOrJoinBase::rename(const String & new_path_to_db, const String & /*new_database_name*/, const String & new_table_name)
2015-01-27 00:52:03 +00:00
{
/// Rename directory with data.
String new_path = new_path_to_db + escapeForFileName(new_table_name);
Poco::File(path).renameTo(new_path);
2015-01-27 00:52:03 +00:00
path = new_path + "/";
table_name = new_table_name;
2015-01-27 00:52:03 +00:00
}
void registerStorageSet(StorageFactory & factory)
{
factory.registerStorage("Set", [](const StorageFactory::Arguments & args)
{
if (!args.engine_args.empty())
throw Exception(
"Engine " + args.engine_name + " doesn't support any arguments (" + toString(args.engine_args.size()) + " given)",
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
return StorageSet::create(args.data_path, args.table_name, args.columns);
});
}
2015-01-27 00:52:03 +00:00
}