Merge branch 'master' into fix-materialized-pg-issue-with-generated-columns

This commit is contained in:
Kseniia Sumarokova 2023-12-08 15:37:29 +01:00 committed by GitHub
commit 928993f2bd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
16 changed files with 416 additions and 69 deletions

View File

@ -1,7 +1,7 @@
# docker build -t clickhouse/integration-helper .
# Helper docker container to run iptables without sudo
FROM alpine
FROM alpine:3.18
RUN apk add --no-cache -U iproute2 \
&& for bin in iptables iptables-restore iptables-save; \
do ln -sf xtables-nft-multi "/sbin/$bin"; \

View File

@ -367,6 +367,7 @@ struct ContextSharedPart : boost::noncopyable
std::shared_ptr<Clusters> clusters TSA_GUARDED_BY(clusters_mutex);
ConfigurationPtr clusters_config TSA_GUARDED_BY(clusters_mutex); /// Stores updated configs
std::unique_ptr<ClusterDiscovery> cluster_discovery TSA_GUARDED_BY(clusters_mutex);
size_t clusters_version TSA_GUARDED_BY(clusters_mutex) = 0;
/// No lock required for async_insert_queue modified only during initialization
std::shared_ptr<AsynchronousInsertQueue> async_insert_queue;
@ -3523,6 +3524,14 @@ void Context::setClustersConfig(const ConfigurationPtr & config, bool enable_dis
shared->clusters = std::make_shared<Clusters>(*shared->clusters_config, settings, getMacros(), config_name);
else
shared->clusters->updateClusters(*shared->clusters_config, settings, config_name, old_clusters_config);
++shared->clusters_version;
}
size_t Context::getClustersVersion() const
{
std::lock_guard lock(shared->clusters_mutex);
return shared->clusters_version;
}

View File

@ -1023,6 +1023,7 @@ public:
std::shared_ptr<Cluster> getCluster(const std::string & cluster_name) const;
std::shared_ptr<Cluster> tryGetCluster(const std::string & cluster_name) const;
void setClustersConfig(const ConfigurationPtr & config, bool enable_discovery = false, const String & config_name = "remote_servers");
size_t getClustersVersion() const;
void startClusterDiscovery();

View File

@ -7,7 +7,6 @@
#include <Formats/NativeReader.h>
#include <Processors/ISource.h>
#include <Interpreters/Context.h>
#include <Interpreters/Cluster.h>
#include <IO/ReadBufferFromFile.h>
#include <IO/WriteBufferFromFile.h>
#include <IO/ConnectionTimeouts.h>
@ -55,7 +54,7 @@ namespace
{
template <typename PoolFactory>
ConnectionPoolPtrs createPoolsForAddresses(const std::string & name, PoolFactory && factory, const Cluster::ShardsInfo & shards_info, Poco::Logger * log)
ConnectionPoolPtrs createPoolsForAddresses(const Cluster::Addresses & addresses, PoolFactory && factory, Poco::Logger * log)
{
ConnectionPoolPtrs pools;
@ -76,30 +75,8 @@ ConnectionPoolPtrs createPoolsForAddresses(const std::string & name, PoolFactory
}
};
for (auto it = boost::make_split_iterator(name, boost::first_finder(",")); it != decltype(it){}; ++it)
{
const std::string & dirname = boost::copy_range<std::string>(*it);
Cluster::Address address = Cluster::Address::fromFullString(dirname);
if (address.shard_index && dirname.ends_with("_all_replicas"))
{
if (address.shard_index > shards_info.size())
{
LOG_ERROR(log, "No shard with shard_index={} ({})", address.shard_index, name);
continue;
}
const auto & shard_info = shards_info[address.shard_index - 1];
size_t replicas = shard_info.per_replica_pools.size();
for (size_t replica_index = 1; replica_index <= replicas; ++replica_index)
{
address.replica_index = static_cast<UInt32>(replica_index);
make_connection(address);
}
}
else
make_connection(address);
}
for (const auto & address : addresses)
make_connection(address);
return pools;
}
@ -254,34 +231,14 @@ void DistributedAsyncInsertDirectoryQueue::run()
}
ConnectionPoolPtr DistributedAsyncInsertDirectoryQueue::createPool(const std::string & name, const StorageDistributed & storage)
ConnectionPoolPtr DistributedAsyncInsertDirectoryQueue::createPool(const Cluster::Addresses & addresses, const StorageDistributed & storage)
{
const auto pool_factory = [&storage, &name] (const Cluster::Address & address) -> ConnectionPoolPtr
const auto pool_factory = [&storage] (const Cluster::Address & address) -> ConnectionPoolPtr
{
const auto & cluster = storage.getCluster();
const auto & shards_info = cluster->getShardsInfo();
const auto & shards_addresses = cluster->getShardsAddresses();
/// Check new format shard{shard_index}_replica{replica_index}
/// (shard_index and replica_index starts from 1).
if (address.shard_index != 0)
{
if (!address.replica_index)
throw Exception(ErrorCodes::INCORRECT_FILE_NAME,
"Wrong replica_index={} ({})", address.replica_index, name);
if (address.shard_index > shards_info.size())
throw Exception(ErrorCodes::INCORRECT_FILE_NAME,
"No shard with shard_index={} ({})", address.shard_index, name);
const auto & shard_info = shards_info[address.shard_index - 1];
if (address.replica_index > shard_info.per_replica_pools.size())
throw Exception(ErrorCodes::INCORRECT_FILE_NAME,
"No shard with replica_index={} ({})", address.replica_index, name);
return shard_info.per_replica_pools[address.replica_index - 1];
}
/// Existing connections pool have a higher priority.
for (size_t shard_index = 0; shard_index < shards_info.size(); ++shard_index)
{
@ -318,7 +275,7 @@ ConnectionPoolPtr DistributedAsyncInsertDirectoryQueue::createPool(const std::st
address.secure);
};
auto pools = createPoolsForAddresses(name, pool_factory, storage.getCluster()->getShardsInfo(), storage.log);
auto pools = createPoolsForAddresses(addresses, pool_factory, storage.log);
const auto settings = storage.getContext()->getSettings();
return pools.size() == 1 ? pools.front() : std::make_shared<ConnectionPoolWithFailover>(pools,

View File

@ -4,6 +4,7 @@
#include <Common/ConcurrentBoundedQueue.h>
#include <Client/ConnectionPool.h>
#include <IO/ReadBufferFromFile.h>
#include <Interpreters/Cluster.h>
#include <Disks/IDisk.h>
#include <atomic>
#include <mutex>
@ -56,7 +57,7 @@ public:
~DistributedAsyncInsertDirectoryQueue();
static ConnectionPoolPtr createPool(const std::string & name, const StorageDistributed & storage);
static ConnectionPoolPtr createPool(const Cluster::Addresses & addresses, const StorageDistributed & storage);
void updatePath(const std::string & new_relative_path);

View File

@ -1379,9 +1379,13 @@ DistributedAsyncInsertDirectoryQueue & StorageDistributed::getDirectoryQueue(con
std::lock_guard lock(cluster_nodes_mutex);
auto & node_data = cluster_nodes_data[key];
if (!node_data.directory_queue)
/// If the node changes, you need to recreate the DistributedAsyncInsertDirectoryQueue
if (!node_data.directory_queue
|| (node_data.clusters_version < getContext()->getClustersVersion() && node_data.addresses != parseAddresses(name)))
{
node_data.connection_pool = DistributedAsyncInsertDirectoryQueue::createPool(name, *this);
node_data.addresses = parseAddresses(name);
node_data.clusters_version = getContext()->getClustersVersion();
node_data.connection_pool = DistributedAsyncInsertDirectoryQueue::createPool(node_data.addresses, *this);
node_data.directory_queue = std::make_unique<DistributedAsyncInsertDirectoryQueue>(
*this, disk, relative_data_path + name,
node_data.connection_pool,
@ -1401,6 +1405,53 @@ std::vector<DistributedAsyncInsertDirectoryQueue::Status> StorageDistributed::ge
return statuses;
}
Cluster::Addresses StorageDistributed::parseAddresses(const std::string & name) const
{
Cluster::Addresses addresses;
const auto & cluster = getCluster();
const auto & shards_info = cluster->getShardsInfo();
const auto & shards_addresses = cluster->getShardsAddresses();
for (auto it = boost::make_split_iterator(name, boost::first_finder(",")); it != decltype(it){}; ++it)
{
const std::string & dirname = boost::copy_range<std::string>(*it);
Cluster::Address address = Cluster::Address::fromFullString(dirname);
/// Check new format shard{shard_index}_replica{replica_index}
/// (shard_index and replica_index starts from 1).
if (address.shard_index)
{
if (address.shard_index > shards_info.size())
{
LOG_ERROR(log, "No shard with shard_index={} ({})", address.shard_index, name);
continue;
}
const auto & replicas_addresses = shards_addresses[address.shard_index - 1];
size_t replicas = replicas_addresses.size();
if (dirname.ends_with("_all_replicas"))
{
for (const auto & replica_address : replicas_addresses)
addresses.push_back(replica_address);
continue;
}
if (address.replica_index > replicas)
{
LOG_ERROR(log, "No shard with replica_index={} ({})", address.replica_index, name);
continue;
}
addresses.push_back(replicas_addresses[address.replica_index - 1]);
}
else
addresses.push_back(address);
}
return addresses;
}
std::optional<UInt64> StorageDistributed::totalBytes(const Settings &) const
{
UInt64 total_bytes = 0;

View File

@ -176,6 +176,8 @@ private:
/// Used for the INSERT into Distributed in case of distributed_foreground_insert==1, from DistributedSink.
DistributedAsyncInsertDirectoryQueue & getDirectoryQueue(const DiskPtr & disk, const std::string & name);
/// Parse the address corresponding to the directory name of the directory queue
Cluster::Addresses parseAddresses(const std::string & name) const;
/// Return list of metrics for all created monitors
/// (note that monitors are created lazily, i.e. until at least one INSERT executed)
@ -270,6 +272,8 @@ private:
{
std::shared_ptr<DistributedAsyncInsertDirectoryQueue> directory_queue;
ConnectionPoolPtr connection_pool;
Cluster::Addresses addresses;
size_t clusters_version;
};
std::unordered_map<std::string, ClusterNodeData> cluster_nodes_data;
mutable std::mutex cluster_nodes_mutex;

View File

@ -1869,7 +1869,7 @@ MergeTreeData::DataPartsVector StorageReplicatedMergeTree::checkPartChecksumsAnd
}
}
throw zkutil::KeeperException(e);
throw zkutil::KeeperMultiException(e, ops, responses);
}
}

View File

@ -2,6 +2,8 @@
#include <Parsers/ASTLiteral.h>
#include <Access/Common/AccessFlags.h>
#include <DataTypes/DataTypeArray.h>
#include <DataTypes/DataTypeString.h>
#include <DataTypes/DataTypesNumber.h>
@ -78,6 +80,7 @@ ColumnsDescription TableFunctionDictionary::getActualTableStructure(ContextPtr c
StoragePtr TableFunctionDictionary::executeImpl(
const ASTPtr &, ContextPtr context, const std::string & table_name, ColumnsDescription, bool is_insert_query) const
{
context->checkAccess(AccessType::dictGet, getDatabaseName(), table_name);
StorageID dict_id(getDatabaseName(), table_name);
auto dictionary_table_structure = getActualTableStructure(context, is_insert_query);

View File

@ -56,21 +56,20 @@ def get_scales(runner_type: str) -> Tuple[int, int]:
"returns the multipliers for scaling down and up ASG by types"
# Scaling down is quicker on the lack of running jobs than scaling up on
# queue
scale_down = 2
# The ASG should deflate almost instantly
scale_down = 1
# the style checkers have so many noise, so it scales up too quickly
# The 5 was too quick, there are complainings regarding too slow with
# 10. I am trying 7 now.
# 7 still looks a bit slow, so I try 6
# Let's have it the same as the other ASG
# UPDATE THE COMMENT ON CHANGES
scale_up = 3
if runner_type == "style-checker":
# The ASG should deflate almost instantly
scale_down = 1
# the style checkers have so many noise, so it scales up too quickly
# The 5 was too quick, there are complainings regarding too slow with
# 10. I am trying 7 now.
# 7 still looks a bit slow, so I try 6
# Let's have it the same as the other ASG
# UPDATE THE COMMENT ON CHANGES
## scale_down = 3
if runner_type.startswith("private-"):
scale_up = 1
elif runner_type == "limited-tester":
# The limited runners should inflate and deflate faster
scale_down = 1
scale_up = 2
return scale_down, scale_up

View File

@ -90,13 +90,25 @@ class TestSetCapacity(unittest.TestCase):
16,
),
TestCase("lower-min", 10, 5, 20, [Queue("queued", 5, "lower-min")], 10),
# scale up group with prefix private-
TestCase(
"private-increase",
1,
13,
20,
[
Queue("in_progress", 12, "private-increase"),
Queue("queued", 11, "private-increase"),
],
20,
),
# Decrease capacity
TestCase("w/reserve", 1, 13, 20, [Queue("queued", 5, "w/reserve")], 9),
TestCase("w/reserve", 1, 13, 20, [Queue("queued", 5, "w/reserve")], 5),
TestCase(
"style-checker", 1, 13, 20, [Queue("queued", 5, "style-checker")], 5
),
TestCase("w/reserve", 1, 23, 20, [Queue("queued", 17, "w/reserve")], 20),
TestCase("decrease", 1, 13, 20, [Queue("in_progress", 3, "decrease")], 8),
TestCase("w/reserve", 1, 23, 20, [Queue("queued", 17, "w/reserve")], 17),
TestCase("decrease", 1, 13, 20, [Queue("in_progress", 3, "decrease")], 3),
TestCase(
"style-checker",
1,

View File

@ -0,0 +1,38 @@
<clickhouse>
<remote_servers>
<test_cluster>
<shard>
<replica>
<host>node1</host>
<port>9000</port>
</replica>
</shard>
<shard>
<replica>
<host>node3</host>
<port>9000</port>
</replica>
</shard>
</test_cluster>
<test_cluster_with_replication>
<shard>
<internal_replication>true</internal_replication>
<replica>
<host>node1</host>
<port>9000</port>
</replica>
</shard>
<shard>
<internal_replication>true</internal_replication>
<replica>
<host>node2</host>
<port>9000</port>
</replica>
<replica>
<host>node3</host>
<port>9000</port>
</replica>
</shard>
</test_cluster_with_replication>
</remote_servers>
</clickhouse>

View File

@ -0,0 +1,232 @@
import pytest
from helpers.cluster import ClickHouseCluster
cluster = ClickHouseCluster(__file__)
node1 = cluster.add_instance(
"node1",
main_configs=["configs/remote_servers.xml"],
)
node2 = cluster.add_instance(
"node2",
main_configs=["configs/remote_servers.xml"],
)
node3 = cluster.add_instance(
"node3",
main_configs=["configs/remote_servers.xml"],
)
config1 = """<clickhouse>
<remote_servers>
<test_cluster>
<shard>
<replica>
<host>node1</host>
<port>9000</port>
</replica>
</shard>
<shard>
<replica>
<host>node3</host>
<port>9000</port>
</replica>
</shard>
</test_cluster>
<test_cluster_with_replication>
<shard>
<internal_replication>true</internal_replication>
<replica>
<host>node1</host>
<port>9000</port>
</replica>
</shard>
<shard>
<internal_replication>true</internal_replication>
<replica>
<host>node2</host>
<port>9000</port>
</replica>
<replica>
<host>node3</host>
<port>9000</port>
</replica>
</shard>
</test_cluster_with_replication>
</remote_servers>
</clickhouse>"""
config2 = """<clickhouse>
<remote_servers>
<test_cluster>
<shard>
<replica>
<host>node1</host>
<port>9000</port>
</replica>
</shard>
<shard>
<replica>
<host>node2</host>
<port>9000</port>
</replica>
</shard>
<shard>
<replica>
<host>node3</host>
<port>9000</port>
</replica>
</shard>
</test_cluster>
<test_cluster_with_replication>
<shard>
<internal_replication>true</internal_replication>
<replica>
<host>node1</host>
<port>9000</port>
</replica>
</shard>
<shard>
<internal_replication>true</internal_replication>
<replica>
<host>node3</host>
<port>9000</port>
</replica>
</shard>
</test_cluster_with_replication>
</remote_servers>
</clickhouse>
"""
@pytest.fixture(scope="module")
def started_cluster():
try:
cluster.start()
for _, node in cluster.instances.items():
node.query(
f"""
create table dist_local (c1 Int32, c2 String) engine=MergeTree() order by c1;
create table dist (c1 Int32, c2 String) engine=Distributed(test_cluster, currentDatabase(), dist_local, c1);
create table replica_dist_local (c1 Int32, c2 String) engine=MergeTree() order by c1;
create table replica_dist (c1 Int32, c2 String) engine=Distributed(test_cluster_with_replication, currentDatabase(), replica_dist_local, c1);
"""
)
yield cluster
finally:
cluster.shutdown()
def test_distributed_async_insert(started_cluster):
node1.query("insert into dist select number,'A' from system.numbers limit 10;")
node1.query("system flush distributed dist;")
assert int(node3.query("select count() from dist_local where c2 = 'A'")) == 5
assert int(node1.query("select count() from dist_local where c2 = 'A'")) == 5
# Add node2
node1.replace_config("/etc/clickhouse-server/config.d/remote_servers.xml", config2)
node1.query("SYSTEM RELOAD CONFIG;")
node2.replace_config("/etc/clickhouse-server/config.d/remote_servers.xml", config2)
node2.query("SYSTEM RELOAD CONFIG;")
node3.replace_config("/etc/clickhouse-server/config.d/remote_servers.xml", config2)
node3.query("SYSTEM RELOAD CONFIG;")
node1.query("insert into dist select number,'B' from system.numbers limit 12;")
node1.query("system flush distributed dist;")
assert int(node1.query("select count() from dist_local where c2 = 'B'")) == 4
assert int(node2.query("select count() from dist_local where c2 = 'B'")) == 4
assert int(node3.query("select count() from dist_local where c2 = 'B'")) == 4
# Delete node2
node1.replace_config("/etc/clickhouse-server/config.d/remote_servers.xml", config1)
node1.query("SYSTEM RELOAD CONFIG;")
node2.replace_config("/etc/clickhouse-server/config.d/remote_servers.xml", config1)
node2.query("SYSTEM RELOAD CONFIG;")
node3.replace_config("/etc/clickhouse-server/config.d/remote_servers.xml", config1)
node3.query("SYSTEM RELOAD CONFIG;")
node1.query("insert into dist select number,'C' from system.numbers limit 10;")
node1.query("system flush distributed dist;")
assert int(node1.query("select count() from dist_local where c2 = 'C'")) == 5
assert int(node2.query("select count() from dist_local where c2 = 'C'")) == 0
assert int(node3.query("select count() from dist_local where c2 = 'C'")) == 5
def test_distributed_async_insert_with_replica(started_cluster):
node1.query(
"insert into replica_dist select number,'A' from system.numbers limit 10;"
)
node1.query("system flush distributed replica_dist;")
node2_res = int(
node2.query("select count() from replica_dist_local where c2 = 'A'")
)
node3_res = int(
node3.query("select count() from replica_dist_local where c2 = 'A'")
)
assert (
int(node1.query("select count() from replica_dist_local where c2 = 'A'")) == 5
)
assert (node2_res == 0 and node3_res == 5) or (node2_res == 5 and node3_res == 0)
# Delete node2
node1.replace_config("/etc/clickhouse-server/config.d/remote_servers.xml", config2)
node1.query("SYSTEM RELOAD CONFIG;")
node2.replace_config("/etc/clickhouse-server/config.d/remote_servers.xml", config2)
node2.query("SYSTEM RELOAD CONFIG;")
node3.replace_config("/etc/clickhouse-server/config.d/remote_servers.xml", config2)
node3.query("SYSTEM RELOAD CONFIG;")
node1.query(
"insert into replica_dist select number,'B' from system.numbers limit 10;"
)
node1.query("system flush distributed replica_dist;")
assert (
int(node1.query("select count() from replica_dist_local where c2 = 'B'")) == 5
)
assert (
int(node2.query("select count() from replica_dist_local where c2 = 'B'")) == 0
)
assert (
int(node3.query("select count() from replica_dist_local where c2 = 'B'")) == 5
)
# Add node2
node1.replace_config("/etc/clickhouse-server/config.d/remote_servers.xml", config1)
node1.query("SYSTEM RELOAD CONFIG;")
node2.replace_config("/etc/clickhouse-server/config.d/remote_servers.xml", config1)
node2.query("SYSTEM RELOAD CONFIG;")
node3.replace_config("/etc/clickhouse-server/config.d/remote_servers.xml", config1)
node3.query("SYSTEM RELOAD CONFIG;")
node1.query(
"insert into replica_dist select number,'C' from system.numbers limit 10;"
)
node1.query("system flush distributed replica_dist;")
node2_res = int(
node2.query("select count() from replica_dist_local where c2 = 'C'")
)
node3_res = int(
node3.query("select count() from replica_dist_local where c2 = 'C'")
)
assert (
int(node1.query("select count() from replica_dist_local where c2 = 'C'")) == 5
)
assert (node2_res == 0 and node3_res == 5) or (node2_res == 5 and node3_res == 0)

View File

@ -0,0 +1,3 @@
0
ACCESS_DENIED
ACCESS_DENIED

View File

@ -0,0 +1,37 @@
#!/usr/bin/env bash
CUR_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CUR_DIR"/../shell_config.sh
username="user_${CLICKHOUSE_TEST_UNIQUE_NAME}"
dictname="dict_${CLICKHOUSE_TEST_UNIQUE_NAME}"
${CLICKHOUSE_CLIENT} -nm --query "
CREATE DICTIONARY IF NOT EXISTS ${dictname}
(
id UInt64,
value UInt64
)
PRIMARY KEY id
SOURCE(NULL())
LAYOUT(FLAT())
LIFETIME(MIN 0 MAX 1000);
CREATE USER IF NOT EXISTS ${username} NOT IDENTIFIED;
GRANT CREATE TEMPORARY TABLE ON *.* to ${username};
SELECT * FROM dictionary(${dictname});
SELECT dictGet(${dictname}, 'value', 1);
"
$CLICKHOUSE_CLIENT -nm --user="${username}" --query "
SELECT * FROM dictionary(${dictname});
" 2>&1 | grep -o ACCESS_DENIED | uniq
$CLICKHOUSE_CLIENT -nm --user="${username}" --query "
SELECT dictGet(${dictname}, 'value', 1);
" 2>&1 | grep -o ACCESS_DENIED | uniq
${CLICKHOUSE_CLIENT} -nm --query "
DROP DICTIONARY IF EXISTS ${dictname};
DROP USER IF EXISTS ${username};
"