Merge pull request #18112 from GrigoryPervakov/join_set_s3

Use IDisk in Set and Join storages
This commit is contained in:
alexey-milovidov 2020-12-17 03:40:13 +03:00 committed by GitHub
commit d2653f91e1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 234 additions and 43 deletions

View File

@ -10,7 +10,8 @@ class ASTStorage;
#define SET_RELATED_SETTINGS(M) \
M(Bool, persistent, true, "Disable setting to avoid the overhead of writing to disk for StorageSet", 0)
M(Bool, persistent, true, "Disable setting to avoid the overhead of writing to disk for StorageSet", 0) \
M(String, disk, "default", "Name of the disk used to persist set data", 0)
#define LIST_OF_SET_SETTINGS(M) \
SET_RELATED_SETTINGS(M) \

View File

@ -8,6 +8,7 @@
#include <Core/ColumnNumbers.h>
#include <DataStreams/IBlockInputStream.h>
#include <DataTypes/NestedUtils.h>
#include <Disks/IDisk.h>
#include <Interpreters/joinDispatch.h>
#include <Interpreters/TableJoin.h>
#include <Interpreters/castColumn.h>
@ -35,6 +36,7 @@ namespace ErrorCodes
}
StorageJoin::StorageJoin(
DiskPtr disk_,
const String & relative_path_,
const StorageID & table_id_,
const Names & key_names_,
@ -45,9 +47,8 @@ StorageJoin::StorageJoin(
const ColumnsDescription & columns_,
const ConstraintsDescription & constraints_,
bool overwrite_,
const Context & context_,
bool persistent_)
: StorageSetOrJoinBase{relative_path_, table_id_, columns_, constraints_, context_, persistent_}
: StorageSetOrJoinBase{disk_, relative_path_, table_id_, columns_, constraints_, persistent_}
, key_names(key_names_)
, use_nulls(use_nulls_)
, limits(limits_)
@ -69,9 +70,9 @@ StorageJoin::StorageJoin(
void StorageJoin::truncate(
const ASTPtr &, const StorageMetadataPtr & metadata_snapshot, const Context &, TableExclusiveLockHolder&)
{
Poco::File(path).remove(true);
Poco::File(path).createDirectories();
Poco::File(path + "tmp/").createDirectories();
disk->removeRecursive(path);
disk->createDirectories(path);
disk->createDirectories(path + "tmp/");
increment = 0;
join = std::make_shared<HashJoin>(table_join, metadata_snapshot->getSampleBlock().sortColumns(), overwrite);
@ -124,6 +125,7 @@ void registerStorageJoin(StorageFactory & factory)
auto join_any_take_last_row = settings.join_any_take_last_row;
auto old_any_join = settings.any_join_distinct_right_table_keys;
bool persistent = true;
String disk_name = "default";
if (args.storage_def && args.storage_def->settings)
{
@ -141,6 +143,8 @@ void registerStorageJoin(StorageFactory & factory)
join_any_take_last_row = setting.value;
else if (setting.name == "any_join_distinct_right_table_keys")
old_any_join = setting.value;
else if (setting.name == "disk")
disk_name = setting.value.get<String>();
else if (setting.name == "persistent")
{
auto join_settings = std::make_unique<JoinSettings>();
@ -148,12 +152,12 @@ void registerStorageJoin(StorageFactory & factory)
persistent = join_settings->persistent;
}
else
throw Exception(
"Unknown setting " + setting.name + " for storage " + args.engine_name,
ErrorCodes::BAD_ARGUMENTS);
throw Exception("Unknown setting " + setting.name + " for storage " + args.engine_name, ErrorCodes::BAD_ARGUMENTS);
}
}
DiskPtr disk = args.context.getDisk(disk_name);
if (engine_args.size() < 3)
throw Exception(
"Storage Join requires at least 3 parameters: Join(ANY|ALL|SEMI|ANTI, LEFT|INNER|RIGHT, keys...).",
@ -219,6 +223,7 @@ void registerStorageJoin(StorageFactory & factory)
}
return StorageJoin::create(
disk,
args.relative_data_path,
args.table_id,
key_names,
@ -229,7 +234,6 @@ void registerStorageJoin(StorageFactory & factory)
args.columns,
args.constraints,
join_any_take_last_row,
args.context,
persistent);
};

View File

@ -67,6 +67,7 @@ private:
protected:
StorageJoin(
DiskPtr disk_,
const String & relative_path_,
const StorageID & table_id_,
const Names & key_names_,
@ -76,7 +77,6 @@ protected:
const ColumnsDescription & columns_,
const ConstraintsDescription & constraints_,
bool overwrite,
const Context & context_,
bool persistent_);
};

View File

@ -6,12 +6,11 @@
#include <Compression/CompressedWriteBuffer.h>
#include <DataStreams/NativeBlockOutputStream.h>
#include <DataStreams/NativeBlockInputStream.h>
#include <Disks/IDisk.h>
#include <Common/formatReadable.h>
#include <Common/escapeForFileName.h>
#include <Common/StringUtils/StringUtils.h>
#include <Interpreters/Set.h>
#include <Interpreters/Context.h>
#include <Poco/DirectoryIterator.h>
#include <Parsers/ASTCreateQuery.h>
#include <Parsers/ASTLiteral.h>
@ -49,7 +48,7 @@ private:
String backup_path;
String backup_tmp_path;
String backup_file_name;
WriteBufferFromFile backup_buf;
std::unique_ptr<WriteBufferFromFileBase> backup_buf;
CompressedWriteBuffer compressed_backup_buf;
NativeBlockOutputStream backup_stream;
bool persistent;
@ -68,8 +67,8 @@ SetOrJoinBlockOutputStream::SetOrJoinBlockOutputStream(
, backup_path(backup_path_)
, backup_tmp_path(backup_tmp_path_)
, backup_file_name(backup_file_name_)
, backup_buf(backup_tmp_path + backup_file_name)
, compressed_backup_buf(backup_buf)
, backup_buf(table_.disk->writeFile(backup_tmp_path + backup_file_name))
, compressed_backup_buf(*backup_buf)
, backup_stream(compressed_backup_buf, 0, metadata_snapshot->getSampleBlock())
, persistent(persistent_)
{
@ -92,9 +91,10 @@ void SetOrJoinBlockOutputStream::writeSuffix()
{
backup_stream.flush();
compressed_backup_buf.next();
backup_buf.next();
backup_buf->next();
backup_buf->finalize();
Poco::File(backup_tmp_path + backup_file_name).renameTo(backup_path + backup_file_name);
table.disk->replaceFile(backup_tmp_path + backup_file_name, backup_path + backup_file_name);
}
}
@ -107,13 +107,14 @@ BlockOutputStreamPtr StorageSetOrJoinBase::write(const ASTPtr & /*query*/, const
StorageSetOrJoinBase::StorageSetOrJoinBase(
DiskPtr disk_,
const String & relative_path_,
const StorageID & table_id_,
const ColumnsDescription & columns_,
const ConstraintsDescription & constraints_,
const Context & context_,
bool persistent_)
: IStorage(table_id_),
disk(disk_),
persistent(persistent_)
{
StorageInMemoryMetadata storage_metadata;
@ -125,19 +126,18 @@ StorageSetOrJoinBase::StorageSetOrJoinBase(
if (relative_path_.empty())
throw Exception("Join and Set storages require data path", ErrorCodes::INCORRECT_FILE_NAME);
base_path = context_.getPath();
path = base_path + relative_path_;
path = relative_path_;
}
StorageSet::StorageSet(
DiskPtr disk_,
const String & relative_path_,
const StorageID & table_id_,
const ColumnsDescription & columns_,
const ConstraintsDescription & constraints_,
const Context & context_,
bool persistent_)
: StorageSetOrJoinBase{relative_path_, table_id_, columns_, constraints_, context_, persistent_},
: StorageSetOrJoinBase{disk_, relative_path_, table_id_, columns_, constraints_, persistent_},
set(std::make_shared<Set>(SizeLimits(), false, true))
{
@ -158,9 +158,9 @@ std::optional<UInt64> StorageSet::totalBytes(const Settings &) const { return se
void StorageSet::truncate(const ASTPtr &, const StorageMetadataPtr & metadata_snapshot, const Context &, TableExclusiveLockHolder &)
{
Poco::File(path).remove(true);
Poco::File(path).createDirectories();
Poco::File(path + "tmp/").createDirectories();
disk->removeRecursive(path);
disk->createDirectories(path);
disk->createDirectories(path + "tmp/");
Block header = metadata_snapshot->getSampleBlock();
header = header.sortColumns();
@ -173,24 +173,23 @@ void StorageSet::truncate(const ASTPtr &, const StorageMetadataPtr & metadata_sn
void StorageSetOrJoinBase::restore()
{
Poco::File tmp_dir(path + "tmp/");
if (!tmp_dir.exists())
if (!disk->exists(path + "tmp/"))
{
tmp_dir.createDirectories();
disk->createDirectories(path + "tmp/");
return;
}
static const char * file_suffix = ".bin";
static const auto file_suffix_size = strlen(".bin");
Poco::DirectoryIterator dir_end;
for (Poco::DirectoryIterator dir_it(path); dir_end != dir_it; ++dir_it)
for (auto dir_it{disk->iterateDirectory(path)}; dir_it->isValid(); dir_it->next())
{
const auto & name = dir_it.name();
const auto & name = dir_it->name();
const auto & file_path = dir_it->path();
if (dir_it->isFile()
if (disk->isFile(file_path)
&& endsWith(name, file_suffix)
&& dir_it->getSize() > 0)
&& disk->getFileSize(file_path) > 0)
{
/// Calculate the maximum number of available files with a backup to add the following files with large numbers.
UInt64 file_num = parse<UInt64>(name.substr(0, name.size() - file_suffix_size));
@ -205,8 +204,8 @@ void StorageSetOrJoinBase::restore()
void StorageSetOrJoinBase::restoreFromFile(const String & file_path)
{
ReadBufferFromFile backup_buf(file_path);
CompressedReadBuffer compressed_backup_buf(backup_buf);
auto backup_buf = disk->readFile(file_path);
CompressedReadBuffer compressed_backup_buf(*backup_buf);
NativeBlockInputStream backup_stream(compressed_backup_buf, 0);
backup_stream.readPrefix();
@ -226,10 +225,9 @@ void StorageSetOrJoinBase::restoreFromFile(const String & file_path)
void StorageSetOrJoinBase::rename(const String & new_path_to_table_data, const StorageID & new_table_id)
{
/// Rename directory with data.
String new_path = base_path + new_path_to_table_data;
Poco::File(path).renameTo(new_path);
disk->replaceFile(path, new_path_to_table_data);
path = new_path;
path = new_path_to_table_data;
renameInMemory(new_table_id);
}
@ -251,7 +249,8 @@ void registerStorageSet(StorageFactory & factory)
set_settings->loadFromQuery(*args.storage_def);
}
return StorageSet::create(args.relative_data_path, args.table_id, args.columns, args.constraints, args.context, set_settings->persistent);
DiskPtr disk = args.context.getDisk(set_settings->disk);
return StorageSet::create(disk, args.relative_data_path, args.table_id, args.columns, args.constraints, set_settings->persistent);
}, StorageFactory::StorageFeatures{ .supports_settings = true, });
}

View File

@ -2,6 +2,7 @@
#include <ext/shared_ptr_helper.h>
#include <Interpreters/Context.h>
#include <Storages/IStorage.h>
#include <Storages/SetSettings.h>
@ -29,14 +30,14 @@ public:
protected:
StorageSetOrJoinBase(
DiskPtr disk_,
const String & relative_path_,
const StorageID & table_id_,
const ColumnsDescription & columns_,
const ConstraintsDescription & constraints_,
const Context & context_,
bool persistent_);
String base_path;
DiskPtr disk;
String path;
bool persistent;
@ -85,11 +86,11 @@ private:
protected:
StorageSet(
DiskPtr disk_,
const String & relative_path_,
const StorageID & table_id_,
const ColumnsDescription & columns_,
const ConstraintsDescription & constraints_,
const Context & context_,
bool persistent_);
};

View File

@ -0,0 +1,12 @@
<yandex>
<shutdown_wait_unfinished>3</shutdown_wait_unfinished>
<logger>
<level>trace</level>
<log>/var/log/clickhouse-server/log.log</log>
<errorlog>/var/log/clickhouse-server/log.err.log</errorlog>
<size>1000M</size>
<count>10</count>
<stderr>/var/log/clickhouse-server/stderr.log</stderr>
<stdout>/var/log/clickhouse-server/stdout.log</stdout>
</logger>
</yandex>

View File

@ -0,0 +1,28 @@
<?xml version="1.0"?>
<yandex>
<logger>
<level>trace</level>
<log>/var/log/clickhouse-server/clickhouse-server.log</log>
<errorlog>/var/log/clickhouse-server/clickhouse-server.err.log</errorlog>
<size>1000M</size>
<count>10</count>
</logger>
<tcp_port>9000</tcp_port>
<listen_host>127.0.0.1</listen_host>
<openSSL>
<client>
<cacheSessions>true</cacheSessions>
<verificationMode>none</verificationMode>
<invalidCertificateHandler>
<name>AcceptCertificateHandler</name>
</invalidCertificateHandler>
</client>
</openSSL>
<max_concurrent_queries>500</max_concurrent_queries>
<mark_cache_size>5368709120</mark_cache_size>
<path>./clickhouse/</path>
<users_config>users.xml</users_config>
</yandex>

View File

@ -0,0 +1,14 @@
<?xml version="1.0"?>
<yandex>
<storage_configuration>
<disks>
<s3>
<type>s3</type>
<endpoint>http://minio1:9001/root/data/</endpoint>
<access_key_id>minio</access_key_id>
<secret_access_key>minio123</secret_access_key>
<send_object_metadata>true</send_object_metadata>
</s3>
</disks>
</storage_configuration>
</yandex>

View File

@ -0,0 +1,12 @@
<?xml version="1.0"?>
<yandex>
<openSSL>
<client>
<cacheSessions>true</cacheSessions>
<verificationMode>none</verificationMode>
<invalidCertificateHandler>
<name>AcceptCertificateHandler</name>
</invalidCertificateHandler>
</client>
</openSSL>
</yandex>

View File

@ -0,0 +1,23 @@
<?xml version="1.0"?>
<yandex>
<profiles>
<default>
</default>
</profiles>
<users>
<default>
<password></password>
<networks incl="networks" replace="replace">
<ip>::/0</ip>
</networks>
<profile>default</profile>
<quota>default</quota>
</default>
</users>
<quotas>
<default>
</default>
</quotas>
</yandex>

View File

@ -0,0 +1,97 @@
import logging
import sys
import pytest
from helpers.cluster import ClickHouseCluster
logging.getLogger().setLevel(logging.INFO)
logging.getLogger().addHandler(logging.StreamHandler())
@pytest.fixture(scope="module")
def cluster():
try:
cluster = ClickHouseCluster(__file__)
cluster.add_instance("node",
main_configs=["configs/minio.xml", "configs/ssl.xml", "configs/config.d/log_conf.xml"],
with_minio=True, stay_alive=True)
logging.info("Starting cluster...")
cluster.start()
logging.info("Cluster started")
yield cluster
finally:
cluster.shutdown()
def assert_objects_count(cluster, objects_count, path='data/'):
minio = cluster.minio_client
s3_objects = list(minio.list_objects(cluster.minio_bucket, path))
if objects_count != len(s3_objects):
for s3_object in s3_objects:
object_meta = minio.stat_object(cluster.minio_bucket, s3_object.object_name)
logging.info("Existing S3 object: %s", str(object_meta))
assert objects_count == len(s3_objects)
def test_set_s3(cluster):
node = cluster.instances["node"]
node.query("CREATE TABLE testLocalSet (n UInt64) Engine = Set")
node.query("CREATE TABLE testS3Set (n UInt64) Engine = Set SETTINGS disk='s3'")
node.query("INSERT INTO TABLE testLocalSet VALUES (1)")
node.query("INSERT INTO TABLE testS3Set VALUES (1)")
assert node.query("SELECT number in testLocalSet, number in testS3Set FROM system.numbers LIMIT 3") == "0\t0\n1\t1\n0\t0\n"
assert_objects_count(cluster, 1)
node.query("INSERT INTO TABLE testLocalSet VALUES (2)")
node.query("INSERT INTO TABLE testS3Set VALUES (2)")
assert node.query("SELECT number in testLocalSet, number in testS3Set FROM system.numbers LIMIT 3") == "0\t0\n1\t1\n1\t1\n"
assert_objects_count(cluster, 2)
node.restart_clickhouse()
assert node.query("SELECT number in testLocalSet, number in testS3Set FROM system.numbers LIMIT 3") == "0\t0\n1\t1\n1\t1\n"
node.query("TRUNCATE TABLE testLocalSet")
node.query("TRUNCATE TABLE testS3Set")
assert node.query("SELECT number in testLocalSet, number in testS3Set FROM system.numbers LIMIT 3") == "0\t0\n0\t0\n0\t0\n"
assert_objects_count(cluster, 0)
node.query("DROP TABLE testLocalSet")
node.query("DROP TABLE testS3Set")
def test_join_s3(cluster):
node = cluster.instances["node"]
node.query("CREATE TABLE testLocalJoin(`id` UInt64, `val` String) ENGINE = Join(ANY, LEFT, id)")
node.query("CREATE TABLE testS3Join(`id` UInt64, `val` String) ENGINE = Join(ANY, LEFT, id) SETTINGS disk='s3'")
node.query("INSERT INTO testLocalJoin VALUES (1, 'a')")
node.query("INSERT INTO testS3Join VALUES (1, 'a')")
assert node.query("SELECT joinGet('testLocalJoin', 'val', number) as local, joinGet('testS3Join', 'val', number) as s3 FROM system.numbers LIMIT 3") == "\t\na\ta\n\t\n"
assert_objects_count(cluster, 1)
node.query("INSERT INTO testLocalJoin VALUES (2, 'b')")
node.query("INSERT INTO testS3Join VALUES (2, 'b')")
assert node.query("SELECT joinGet('testLocalJoin', 'val', number) as local, joinGet('testS3Join', 'val', number) as s3 FROM system.numbers LIMIT 3") == "\t\na\ta\nb\tb\n"
assert_objects_count(cluster, 2)
node.restart_clickhouse()
assert node.query("SELECT joinGet('testLocalJoin', 'val', number) as local, joinGet('testS3Join', 'val', number) as s3 FROM system.numbers LIMIT 3") == "\t\na\ta\nb\tb\n"
node.query("TRUNCATE TABLE testLocalJoin")
node.query("TRUNCATE TABLE testS3Join")
assert node.query("SELECT joinGet('testLocalJoin', 'val', number) as local, joinGet('testS3Join', 'val', number) as s3 FROM system.numbers LIMIT 3") == "\t\n\t\n\t\n"
assert_objects_count(cluster, 0)
node.query("DROP TABLE testLocalJoin")
node.query("DROP TABLE testS3Join")