This commit is contained in:
kssenii 2024-01-30 15:58:35 +01:00
parent df7d3b8a40
commit 145f6a31cb
4 changed files with 11 additions and 10 deletions

View File

@ -160,11 +160,7 @@ void S3QueueFilesMetadata::deactivateCleanupTask()
zkutil::ZooKeeperPtr S3QueueFilesMetadata::getZooKeeper() const
{
if (!zookeeper || zookeeper->expired())
{
zookeeper = Context::getGlobalContextInstance()->getZooKeeper();
}
return zookeeper;
return Context::getGlobalContextInstance()->getZooKeeper();
}
S3QueueFilesMetadata::FileStatusPtr S3QueueFilesMetadata::getFileStatus(const std::string & path)
@ -318,7 +314,7 @@ size_t S3QueueFilesMetadata::getIdForProcessingThread(size_t thread_id, size_t s
size_t S3QueueFilesMetadata::getProcessingIdForPath(const std::string & path) const
{
return sipHash64(path.data(), path.size()) % getProcessingIdsNum();
return sipHash64(path) % getProcessingIdsNum();
}
S3QueueFilesMetadata::ProcessingNodeHolderPtr S3QueueFilesMetadata::trySetFileAsProcessing(const std::string & path)

View File

@ -129,7 +129,6 @@ private:
const fs::path zookeeper_cleanup_lock_path;
LoggerPtr log;
mutable zkutil::ZooKeeperPtr zookeeper;
std::atomic_bool shutdown = false;
BackgroundSchedulePool::TaskHolder task;

View File

@ -100,7 +100,7 @@ StorageS3QueueSource::KeyWithInfoPtr StorageS3QueueSource::FileIterator::next(si
{
if (metadata->isProcessingIdBelongsToShard(processing_id_for_key, current_shard))
{
LOG_TEST(log, "Putting key {} into queue of shard {} (total: {})",
LOG_TEST(log, "Putting key {} into queue of processor {} (total: {})",
val->key, processing_id_for_key, sharded_keys.size());
if (auto it = sharded_keys.find(idx); it != sharded_keys.end())

View File

@ -1209,6 +1209,12 @@ def test_shards_distributed(started_cluster, mode, processing_threads):
shard_nodes = zk.get_children(f"{keeper_path}/shards/")
assert len(shard_nodes) == shards_num
node.restart_clickhouse()
time.sleep(10)
assert (
get_count(node, dst_table_name) + get_count(node_2, dst_table_name)
) == total_rows
def test_settings_check(started_cluster):
node = started_cluster.instances["instance"]
@ -1219,8 +1225,6 @@ def test_settings_check(started_cluster):
files_path = f"{table_name}_data"
mode = "ordered"
node.restart_clickhouse()
create_table(
started_cluster,
node,
@ -1271,7 +1275,9 @@ def test_settings_check(started_cluster):
assert "s3queue_current_shard_num = 0" in node.query(
f"SHOW CREATE TABLE {table_name}"
)
node.restart_clickhouse()
assert "s3queue_current_shard_num = 0" in node.query(
f"SHOW CREATE TABLE {table_name}"
)