This commit is contained in:
kssenii 2023-09-14 18:41:31 +02:00
parent afcb0b2b9a
commit 342755d35e
6 changed files with 357 additions and 218 deletions

View File

@ -1,8 +1,13 @@
#include "Common/Exception.h"
#include "Common/ZooKeeper/Types.h"
#include "Interpreters/Context_fwd.h"
#include "Storages/S3Queue/S3QueueSettings.h"
#include "config.h"
#if USE_AWS_S3
#include <base/sleep.h>
#include <Common/ZooKeeper/ZooKeeper.h>
#include <Common/randomSeed.h>
#include <IO/Operators.h>
#include <IO/ReadBufferFromString.h>
#include <IO/ReadHelpers.h>
@ -28,11 +33,22 @@ namespace
{
return std::chrono::duration_cast<std::chrono::seconds>(std::chrono::system_clock::now().time_since_epoch()).count();
}
size_t generateRescheduleInterval()
{
/// Use more or less random interval for unordered mode cleanup task.
/// So that distributed processing cleanup tasks would not schedule cleanup at the same time.
/// TODO: make lower and upper boundary configurable by settings
pcg64 rng(randomSeed());
//return 5000 + rng() % 30000;
return rng() % 100;
}
}
S3QueueFilesMetadata::S3QueueFilesMetadata(
const StorageS3Queue * storage_,
const S3QueueSettings & settings_)
const S3QueueSettings & settings_,
ContextPtr context)
: storage(storage_)
, mode(settings_.mode)
, max_set_size(settings_.s3queue_tracked_files_limit.value)
@ -43,6 +59,27 @@ S3QueueFilesMetadata::S3QueueFilesMetadata(
, zookeeper_failed_path(storage->getZooKeeperPath() / "failed")
, log(&Poco::Logger::get("S3QueueFilesMetadata"))
{
if (mode == S3QueueMode::UNORDERED && (max_set_size || max_set_age_sec))
{
task = context->getSchedulePool().createTask("S3QueueCleanupFunc", [this] { cleanupThreadFunc(); });
task->activate();
auto schedule_ms = generateRescheduleInterval();
LOG_TEST(log, "Scheduling a cleanup task in {} ms", schedule_ms);
task->scheduleAfter(schedule_ms);
}
}
S3QueueFilesMetadata::~S3QueueFilesMetadata()
{
deactivateCleanupTask();
}
void S3QueueFilesMetadata::deactivateCleanupTask()
{
shutdown = true;
if (task)
task->deactivate();
}
std::string S3QueueFilesMetadata::NodeMetadata::toString() const
@ -109,6 +146,10 @@ bool S3QueueFilesMetadata::trySetFileAsProcessing(const std::string & path)
bool S3QueueFilesMetadata::trySetFileAsProcessingForUnorderedMode(const std::string & path)
{
/// Create an ephemenral node in /processing
/// if corresponding node does not exist in failed/, processed/ and processing/.
/// Return false otherwise.
const auto node_name = getNodeName(path);
const auto node_metadata = createNodeMetadata(path).toString();
const auto zk_client = storage->getZooKeeper();
@ -125,6 +166,10 @@ bool S3QueueFilesMetadata::trySetFileAsProcessingForUnorderedMode(const std::str
bool S3QueueFilesMetadata::trySetFileAsProcessingForOrderedMode(const std::string & path)
{
/// Create an ephemenral node in /processing
/// if corresponding it does not exist in failed/, processing/ and satisfied max processed file check.
/// Return false otherwise.
const auto node_name = getNodeName(path);
const auto node_metadata = createNodeMetadata(path).toString();
const auto zk_client = storage->getZooKeeper();
@ -195,8 +240,7 @@ void S3QueueFilesMetadata::setFileProcessed(const String & path)
void S3QueueFilesMetadata::setFileProcessedForUnorderedMode(const String & path)
{
/// List results in s3 are always returned in UTF-8 binary order.
/// (https://docs.aws.amazon.com/AmazonS3/latest/userguide/ListingKeysUsingAPIs.html)
/// Create a persistent node in /processed and remove ephemeral node from /processing.
const auto node_name = getNodeName(path);
const auto node_metadata = createNodeMetadata(path).toString();
@ -337,6 +381,129 @@ void S3QueueFilesMetadata::setFileFailed(const String & path, const String & exc
}
}
void S3QueueFilesMetadata::cleanupThreadFunc()
{
/// A background task is responsible for maintaining
/// max_set_size and max_set_age settings for `unordered` processing mode.
if (shutdown)
return;
try
{
cleanupThreadFuncImpl();
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
if (shutdown)
return;
task->scheduleAfter(generateRescheduleInterval());
}
void S3QueueFilesMetadata::cleanupThreadFuncImpl()
{
chassert(max_set_size || max_set_age_sec);
const bool check_nodes_limit = max_set_size > 0;
const bool check_nodes_ttl = max_set_age_sec > 0;
const auto zk_client = storage->getZooKeeper();
auto nodes = zk_client->getChildren(zookeeper_processed_path);
if (nodes.empty())
{
LOG_TEST(log, "A set of nodes is empty");
return;
}
const bool nodes_limit_exceeded = nodes.size() > max_set_size;
if (!nodes_limit_exceeded && check_nodes_limit && !check_nodes_ttl)
{
LOG_TEST(log, "No limit exceeded");
return;
}
struct Node
{
std::string name;
NodeMetadata metadata;
};
auto node_cmp = [](const Node & a, const Node & b)
{
return a.metadata.last_processed_timestamp < b.metadata.last_processed_timestamp;
};
/// Ordered in ascending order of timestamps.
std::set<Node, decltype(node_cmp)> sorted_nodes(node_cmp);
for (const auto & node : nodes)
{
try
{
std::string metadata_str;
if (zk_client->tryGet(zookeeper_processed_path / node, metadata_str))
{
bool inserted = sorted_nodes.emplace(node, NodeMetadata::fromString(metadata_str)).second;
chassert(inserted);
}
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
}
/// TODO add a zookeeper lock for cleanup
LOG_TRACE(log, "Checking node limits");
size_t nodes_to_remove = check_nodes_limit && nodes_limit_exceeded ? nodes.size() - max_set_size : 0;
for (const auto & node : sorted_nodes)
{
if (nodes_to_remove)
{
auto path = zookeeper_processed_path / node.name;
LOG_TEST(log, "Removing node at path `{}` because max files limit is reached", path.string());
auto code = zk_client->tryRemove(path);
if (code == Coordination::Error::ZOK)
--nodes_to_remove;
else
LOG_ERROR(log, "Failed to remove a node `{}`", path.string());
}
else if (check_nodes_ttl)
{
UInt64 node_age = getCurrentTime() - node.metadata.last_processed_timestamp;
if (node_age >= max_set_age_sec)
{
auto path = zookeeper_processed_path / node.name;
LOG_TEST(log, "Removing node at path `{}` because file ttl is reached", path.string());
auto code = zk_client->tryRemove(path);
if (code != Coordination::Error::ZOK)
LOG_ERROR(log, "Failed to remove a node `{}`", path.string());
}
else if (!nodes_to_remove)
{
/// Nodes limit satisfied.
/// Nodes ttl satisfied as well as if current node is under tll, then all remaining as well
/// (because we are iterating in timestamp ascending order).
break;
}
}
else
{
/// Nodes limit and ttl are satisfied.
break;
}
}
LOG_TRACE(log, "Node limits check finished");
}
}
#endif

View File

@ -5,6 +5,7 @@
#include <filesystem>
#include <Core/Types.h>
#include <Core/SettingsEnums.h>
#include <Core/BackgroundSchedulePool.h>
namespace fs = std::filesystem;
namespace Poco { class Logger; }
@ -17,7 +18,9 @@ class StorageS3Queue;
class S3QueueFilesMetadata
{
public:
S3QueueFilesMetadata(const StorageS3Queue * storage_, const S3QueueSettings & settings_);
S3QueueFilesMetadata(const StorageS3Queue * storage_, const S3QueueSettings & settings_, ContextPtr context);
~S3QueueFilesMetadata();
bool trySetFileAsProcessing(const std::string & path);
@ -25,6 +28,8 @@ public:
void setFileFailed(const std::string & path, const std::string & exception_message);
void deactivateCleanupTask();
private:
const StorageS3Queue * storage;
const S3QueueMode mode;
@ -39,6 +44,9 @@ private:
mutable std::mutex mutex;
Poco::Logger * log;
std::atomic_bool shutdown = false;
BackgroundSchedulePool::TaskHolder task;
bool trySetFileAsProcessingForOrderedMode(const std::string & path);
bool trySetFileAsProcessingForUnorderedMode(const std::string & path);
@ -59,6 +67,9 @@ private:
};
NodeMetadata createNodeMetadata(const std::string & path, const std::string & exception = "", size_t retries = 0);
void cleanupThreadFunc();
void cleanupThreadFuncImpl();
};
}

View File

@ -37,6 +37,9 @@ StorageS3QueueSource::FileIterator::FileIterator(
StorageS3QueueSource::KeyWithInfo StorageS3QueueSource::FileIterator::next()
{
/// List results in s3 are always returned in UTF-8 binary order.
/// (https://docs.aws.amazon.com/AmazonS3/latest/userguide/ListingKeysUsingAPIs.html)
while (true)
{
KeyWithInfo val = glob_iterator->next();

View File

@ -88,7 +88,7 @@ StorageS3Queue::StorageS3Queue(
, s3queue_settings(std::move(s3queue_settings_))
, zk_path(chooseZooKeeperPath(table_id_, context_->getSettingsRef(), *s3queue_settings))
, after_processing(s3queue_settings->after_processing)
, files_metadata(std::make_shared<S3QueueFilesMetadata>(this, *s3queue_settings))
, files_metadata(std::make_shared<S3QueueFilesMetadata>(this, *s3queue_settings, context_))
, configuration{configuration_}
, format_settings(format_settings_)
, reschedule_processing_interval_ms(s3queue_settings->s3queue_polling_min_timeout_ms)
@ -138,8 +138,17 @@ void StorageS3Queue::startup()
void StorageS3Queue::shutdown()
{
shutdown_called = true;
if (task)
{
task->deactivate();
}
if (files_metadata)
{
files_metadata->deactivateCleanupTask();
files_metadata.reset();
}
}
bool StorageS3Queue::supportsSubsetOfColumns(const ContextPtr & context_) const
@ -182,7 +191,7 @@ std::shared_ptr<StorageS3QueueSource> StorageS3Queue::createSource(
{
auto configuration_snapshot = updateConfigurationAndGetCopy(local_context);
auto file_iterator = createFileIterator(local_context, query);
auto read_from_format_info = prepareReadingFromFormat(column_names, storage_snapshot, supportsSubsetOfColumns(), getVirtuals());
auto read_from_format_info = prepareReadingFromFormat(column_names, storage_snapshot, supportsSubsetOfColumns(local_context), getVirtuals());
auto internal_source = std::make_unique<StorageS3Source>(
read_from_format_info, configuration.format, getName(), local_context, format_settings,

View File

@ -1,9 +1,7 @@
#pragma once
#include "config.h"
#if USE_AWS_S3
#include <Common/ZooKeeper/ZooKeeper.h>
#include <Common/logger_useful.h>
#include <Core/BackgroundSchedulePool.h>

View File

@ -191,6 +191,7 @@ def create_table(
files_path,
format="column1 UInt32, column2 UInt32, column3 UInt32",
additional_settings={},
file_format="CSV",
):
settings = {
"s3queue_loading_retries": 0,
@ -204,7 +205,7 @@ def create_table(
node.query(f"DROP TABLE IF EXISTS {table_name}")
create_query = f"""
CREATE TABLE {table_name} ({format})
ENGINE = S3Queue('{url}', {AUTH}'CSV')
ENGINE = S3Queue('{url}', {AUTH}'{file_format}')
SETTINGS {",".join((k+"="+repr(v) for k, v in settings.items()))}
"""
node.query(create_query)
@ -527,50 +528,52 @@ def test_streaming_to_many_views(started_cluster, mode):
def test_multiple_tables_meta_mismatch(started_cluster):
files_path = f"test_meta"
bucket = started_cluster.minio_restricted_bucket
instance = started_cluster.instances["instance"]
table_format = "column1 UInt32, column2 UInt32, column3 UInt32"
node = started_cluster.instances["instance"]
table_name = f"multiple_tables_meta_mismatch"
keeper_path = f"/clickhouse/test_{table_name}"
files_path = f"{table_name}_data"
instance.query(
f"""
DROP TABLE IF EXISTS test.s3_queue;
CREATE TABLE test.s3_queue ({table_format})
ENGINE=S3Queue('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/{files_path}/*', {AUTH}'CSV')
SETTINGS
mode = 'ordered',
keeper_path = '/clickhouse/test_meta';
"""
create_table(
started_cluster,
node,
table_name,
"ordered",
files_path,
additional_settings={
"keeper_path": keeper_path,
},
)
# check mode
failed = False
try:
instance.query(
f"""
CREATE TABLE test.s3_queue_copy ({table_format})
ENGINE=S3Queue('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/{files_path}/*', {AUTH}'CSV')
SETTINGS
mode = 'unordered',
keeper_path = '/clickhouse/test_meta';
"""
create_table(
started_cluster,
node,
f"{table_name}_copy",
"unordered",
files_path,
additional_settings={
"keeper_path": keeper_path,
},
)
except QueryRuntimeException as e:
assert "Existing table metadata in ZooKeeper differs in engine mode" in str(e)
failed = True
assert failed is True
# check columns
table_format_copy = table_format + ", column4 UInt32"
try:
instance.query(
f"""
CREATE TABLE test.s3_queue_copy ({table_format_copy})
ENGINE=S3Queue('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/{files_path}/*', {AUTH}'CSV')
SETTINGS
mode = 'ordered',
keeper_path = '/clickhouse/test_meta';
"""
create_table(
started_cluster,
node,
f"{table_name}_copy",
"ordered",
files_path,
format="column1 UInt32, column2 UInt32, column3 UInt32, column4 UInt32",
additional_settings={
"keeper_path": keeper_path,
},
)
except QueryRuntimeException as e:
assert (
@ -583,172 +586,96 @@ def test_multiple_tables_meta_mismatch(started_cluster):
# check format
try:
instance.query(
f"""
CREATE TABLE test.s3_queue_copy ({table_format})
ENGINE=S3Queue('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/{files_path}/*', {AUTH}'TSV')
SETTINGS
mode = 'ordered',
keeper_path = '/clickhouse/test_meta';
"""
create_table(
started_cluster,
node,
f"{table_name}_copy",
"ordered",
files_path,
format="column1 UInt32, column2 UInt32, column3 UInt32, column4 UInt32",
additional_settings={
"keeper_path": keeper_path,
},
file_format="TSV",
)
except QueryRuntimeException as e:
assert "Existing table metadata in ZooKeeper differs in format name" in str(e)
failed = True
assert failed is True
# create working engine
instance.query(
f"""
CREATE TABLE test.s3_queue_copy ({table_format})
ENGINE=S3Queue('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/{files_path}/*', {AUTH}'CSV')
SETTINGS
mode = 'ordered',
keeper_path = '/clickhouse/test_meta';
"""
create_table(
started_cluster,
node,
f"{table_name}_copy",
"ordered",
files_path,
additional_settings={
"keeper_path": keeper_path,
},
)
def test_max_set_age(started_cluster):
files_to_generate = 10
max_age = 1
files_path = f"test_multiple"
bucket = started_cluster.minio_restricted_bucket
instance = started_cluster.instances["instance"]
table_format = "column1 UInt32, column2 UInt32, column3 UInt32"
instance.query(
f"""
DROP TABLE IF EXISTS test.s3_queue;
CREATE TABLE test.s3_queue ({table_format})
ENGINE=S3Queue('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/{files_path}/*', {AUTH}'CSV')
SETTINGS
mode = 'unordered',
keeper_path = '/clickhouse/test_set_age',
s3queue_tracked_files_limit = 10,
s3queue_tracked_file_ttl_sec = {max_age};
"""
)
total_values = generate_random_files(
files_to_generate, files_path, started_cluster, bucket, row_num=1
)
get_query = f"SELECT * FROM test.s3_queue"
res1 = [
list(map(int, l.split())) for l in run_query(instance, get_query).splitlines()
]
assert res1 == total_values
time.sleep(max_age + 1)
get_query = f"SELECT * FROM test.s3_queue"
res1 = [
list(map(int, l.split())) for l in run_query(instance, get_query).splitlines()
]
assert res1 == total_values
@pytest.mark.parametrize("mode", AVAILABLE_MODES)
def test_multiple_tables_streaming_sync(started_cluster, mode):
node = started_cluster.instances["instance"]
table_name = f"multiple_tables_streaming_sync_{mode}"
dst_table_name = f"{table_name}_dst"
keeper_path = f"/clickhouse/test_{table_name}"
files_path = f"{table_name}_data"
files_to_generate = 300
poll_size = 30
files_path = f"test_multiple_{mode}"
bucket = started_cluster.minio_restricted_bucket
instance = started_cluster.instances["instance"]
table_format = "column1 UInt32, column2 UInt32, column3 UInt32"
instance.query(
f"""
DROP TABLE IF EXISTS test.s3_queue;
DROP TABLE IF EXISTS test.s3_queue_copy;
DROP TABLE IF EXISTS test.s3_queue_copy_2;
for i in range(3):
table = f"{table_name}_{i + 1}"
dst_table = f"{dst_table_name}_{i + 1}"
create_table(
started_cluster,
node,
table,
mode,
files_path,
additional_settings={
"s3queue_polling_size": poll_size,
"keeper_path": keeper_path,
},
)
create_mv(node, table, dst_table)
DROP TABLE IF EXISTS test.s3_queue_persistent;
DROP TABLE IF EXISTS test.s3_queue_persistent_copy;
DROP TABLE IF EXISTS test.s3_queue_persistent_copy_2;
DROP TABLE IF EXISTS test.persistent_s3_queue_mv;
DROP TABLE IF EXISTS test.persistent_s3_queue_mv_copy;
DROP TABLE IF EXISTS test.persistent_s3_queue_mv_copy_2;
CREATE TABLE test.s3_queue ({table_format})
ENGINE=S3Queue('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/{files_path}/*', {AUTH}'CSV')
SETTINGS
mode = '{mode}',
keeper_path = '/clickhouse/test_multiple_consumers_sync_{mode}',
s3queue_polling_size = {poll_size};
CREATE TABLE test.s3_queue_copy ({table_format})
ENGINE=S3Queue('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/{files_path}/*', {AUTH}'CSV')
SETTINGS
mode = '{mode}',
keeper_path = '/clickhouse/test_multiple_consumers_sync_{mode}',
s3queue_polling_size = {poll_size};
CREATE TABLE test.s3_queue_copy_2 ({table_format})
ENGINE=S3Queue('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/{files_path}/*', {AUTH}'CSV')
SETTINGS
mode = '{mode}',
keeper_path = '/clickhouse/test_multiple_consumers_sync_{mode}',
s3queue_polling_size = {poll_size};
CREATE TABLE test.s3_queue_persistent ({table_format})
ENGINE = MergeTree()
ORDER BY column1;
CREATE TABLE test.s3_queue_persistent_copy ({table_format})
ENGINE = MergeTree()
ORDER BY column1;
CREATE TABLE test.s3_queue_persistent_copy_2 ({table_format})
ENGINE = MergeTree()
ORDER BY column1;
CREATE MATERIALIZED VIEW test.persistent_s3_queue_mv TO test.s3_queue_persistent AS
SELECT
*
FROM test.s3_queue;
CREATE MATERIALIZED VIEW test.persistent_s3_queue_mv_copy TO test.s3_queue_persistent_copy AS
SELECT
*
FROM test.s3_queue_copy;
CREATE MATERIALIZED VIEW test.persistent_s3_queue_mv_copy_2 TO test.s3_queue_persistent_copy_2 AS
SELECT
*
FROM test.s3_queue_copy_2;
"""
)
total_values = generate_random_files(
files_to_generate, files_path, started_cluster, bucket, row_num=1
started_cluster, files_path, files_to_generate, row_num=1
)
def get_count(table_name):
return int(run_query(instance, f"SELECT count() FROM {table_name}"))
return int(run_query(node, f"SELECT count() FROM {table_name}"))
for _ in range(100):
if (
get_count("test.s3_queue_persistent")
+ get_count("test.s3_queue_persistent_copy")
+ get_count("test.s3_queue_persistent_copy_2")
get_count(f"{dst_table_name}_1")
+ get_count(f"{dst_table_name}_2")
+ get_count(f"{dst_table_name}_3")
) == files_to_generate:
break
time.sleep(1)
get_query = f"SELECT * FROM test.s3_queue_persistent"
res1 = [
list(map(int, l.split())) for l in run_query(instance, get_query).splitlines()
list(map(int, l.split()))
for l in node.query(
f"SELECT column1, column2, column3 FROM {dst_table_name}_1"
).splitlines()
]
get_query_copy = f"SELECT * FROM test.s3_queue_persistent_copy"
res2 = [
list(map(int, l.split()))
for l in run_query(instance, get_query_copy).splitlines()
for l in node.query(
f"SELECT column1, column2, column3 FROM {dst_table_name}_2"
).splitlines()
]
get_query_copy_2 = f"SELECT * FROM test.s3_queue_persistent_copy_2"
res3 = [
list(map(int, l.split()))
for l in run_query(instance, get_query_copy_2).splitlines()
for l in node.query(
f"SELECT column1, column2, column3 FROM {dst_table_name}_3"
).splitlines()
]
assert {tuple(v) for v in res1 + res2 + res3} == set(
[tuple(i) for i in total_values]
@ -757,54 +684,41 @@ def test_multiple_tables_streaming_sync(started_cluster, mode):
# Checking that all files were processed only once
time.sleep(10)
assert (
get_count("test.s3_queue_persistent")
+ get_count("test.s3_queue_persistent_copy")
+ get_count("test.s3_queue_persistent_copy_2")
get_count(f"{dst_table_name}_1")
+ get_count(f"{dst_table_name}_2")
+ get_count(f"{dst_table_name}_3")
) == files_to_generate
@pytest.mark.parametrize("mode", AVAILABLE_MODES)
def test_multiple_tables_streaming_sync_distributed(started_cluster, mode):
files_to_generate = 100
node = started_cluster.instances["instance"]
node_2 = started_cluster.instances["instance2"]
table_name = f"multiple_tables_streaming_sync_distributed_{mode}"
dst_table_name = f"{table_name}_dst"
keeper_path = f"/clickhouse/test_{table_name}"
files_path = f"{table_name}_data"
files_to_generate = 300
poll_size = 2
files_path = f"test_multiple_{mode}"
bucket = started_cluster.minio_restricted_bucket
instance = started_cluster.instances["instance"]
instance_2 = started_cluster.instances["instance2"]
table_format = "column1 UInt32, column2 UInt32, column3 UInt32"
for inst in [instance, instance_2]:
inst.query(
f"""
DROP TABLE IF EXISTS test.s3_queue;
DROP TABLE IF EXISTS test.s3_queue_persistent;
DROP TABLE IF EXISTS test.persistent_s3_queue_mv;
CREATE TABLE test.s3_queue ({table_format})
ENGINE=S3Queue('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/{files_path}/*', {AUTH}'CSV')
SETTINGS
mode = '{mode}',
keeper_path = '/clickhouse/test_multiple_consumers_{mode}',
s3queue_polling_size = {poll_size};
CREATE TABLE test.s3_queue_persistent ({table_format})
ENGINE = MergeTree()
ORDER BY column1;
"""
for instance in [node, node_2]:
create_table(
started_cluster,
instance,
table_name,
mode,
files_path,
additional_settings={
"s3queue_polling_size": poll_size,
"keeper_path": keeper_path,
},
)
for inst in [instance, instance_2]:
inst.query(
f"""
CREATE MATERIALIZED VIEW test.persistent_s3_queue_mv TO test.s3_queue_persistent AS
SELECT
*
FROM test.s3_queue;
"""
)
for instance in [node, node_2]:
create_mv(instance, table_name, dst_table_name)
total_values = generate_random_files(
files_to_generate, files_path, started_cluster, bucket, row_num=1
started_cluster, files_path, files_to_generate, row_num=1
)
def get_count(node, table_name):
@ -812,18 +726,15 @@ def test_multiple_tables_streaming_sync_distributed(started_cluster, mode):
for _ in range(150):
if (
get_count(instance, "test.s3_queue_persistent")
+ get_count(instance_2, "test.s3_queue_persistent")
get_count(node, dst_table_name) + get_count(node_2, dst_table_name)
) == files_to_generate:
break
time.sleep(1)
get_query = f"SELECT * FROM test.s3_queue_persistent"
res1 = [
list(map(int, l.split())) for l in run_query(instance, get_query).splitlines()
]
get_query = f"SELECT column1, column2, column3 FROM {dst_table_name}"
res1 = [list(map(int, l.split())) for l in run_query(node, get_query).splitlines()]
res2 = [
list(map(int, l.split())) for l in run_query(instance_2, get_query).splitlines()
list(map(int, l.split())) for l in run_query(node_2, get_query).splitlines()
]
assert len(res1) + len(res2) == files_to_generate
@ -837,11 +748,51 @@ def test_multiple_tables_streaming_sync_distributed(started_cluster, mode):
# Checking that all files were processed only once
time.sleep(10)
assert (
get_count(instance, "test.s3_queue_persistent")
+ get_count(instance_2, "test.s3_queue_persistent")
get_count(node, dst_table_name) + get_count(node_2, dst_table_name)
) == files_to_generate
def test_max_set_age(started_cluster):
node = started_cluster.instances["instance"]
table_name = f"max_set_age"
keeper_path = f"/clickhouse/test_{table_name}"
files_path = f"{table_name}_data"
max_age = 1
files_to_generate = 10
create_table(
started_cluster,
node,
table_name,
"unordered",
files_path,
additional_settings={
"keeper_path": keeper_path,
"s3queue_tracked_files_limit": 10,
"s3queue_tracked_file_ttl_sec": max_age,
},
)
node.wait_for_log_line("Checking node limits")
node.wait_for_log_line("Node limits check finished")
total_values = generate_random_files(
started_cluster, files_path, files_to_generate, row_num=1
)
res1 = [
list(map(int, l.split()))
for l in run_query(node, f"SELECT * FROM {table_name}").splitlines()
]
assert res1 == total_values
time.sleep(max_age + 1)
res1 = [
list(map(int, l.split()))
for l in run_query(node, f"SELECT * FROM {table_name}").splitlines()
]
assert res1 == total_values
def test_max_set_size(started_cluster):
files_to_generate = 10
files_path = f"test_multiple"