mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-22 07:31:57 +00:00
Merge pull request #28637 from vitlibar/fix-materialized-column-as-sharding-key
Fix materialized column as sharding key
This commit is contained in:
commit
8a01b32cba
@ -4,6 +4,8 @@
|
||||
#include <Storages/StorageDistributed.h>
|
||||
#include <Disks/StoragePolicy.h>
|
||||
|
||||
#include <Parsers/ASTIdentifier.h>
|
||||
#include <Parsers/ASTInsertQuery.h>
|
||||
#include <Parsers/formatAST.h>
|
||||
#include <Parsers/queryToString.h>
|
||||
|
||||
@ -88,26 +90,40 @@ static void writeBlockConvert(PushingPipelineExecutor & executor, const Block &
|
||||
}
|
||||
|
||||
|
||||
static ASTPtr createInsertToRemoteTableQuery(const std::string & database, const std::string & table, const Names & column_names)
|
||||
{
|
||||
auto query = std::make_shared<ASTInsertQuery>();
|
||||
query->table_id = StorageID(database, table);
|
||||
auto columns = std::make_shared<ASTExpressionList>();
|
||||
query->columns = columns;
|
||||
query->children.push_back(columns);
|
||||
for (const auto & column_name : column_names)
|
||||
columns->children.push_back(std::make_shared<ASTIdentifier>(column_name));
|
||||
return query;
|
||||
}
|
||||
|
||||
|
||||
DistributedSink::DistributedSink(
|
||||
ContextPtr context_,
|
||||
StorageDistributed & storage_,
|
||||
const StorageMetadataPtr & metadata_snapshot_,
|
||||
const ASTPtr & query_ast_,
|
||||
const ClusterPtr & cluster_,
|
||||
bool insert_sync_,
|
||||
UInt64 insert_timeout_,
|
||||
StorageID main_table_)
|
||||
StorageID main_table_,
|
||||
const Names & columns_to_send_)
|
||||
: SinkToStorage(metadata_snapshot_->getSampleBlock())
|
||||
, context(Context::createCopy(context_))
|
||||
, storage(storage_)
|
||||
, metadata_snapshot(metadata_snapshot_)
|
||||
, query_ast(query_ast_)
|
||||
, query_string(queryToString(query_ast_))
|
||||
, query_ast(createInsertToRemoteTableQuery(main_table_.database_name, main_table_.table_name, columns_to_send_))
|
||||
, query_string(queryToString(query_ast))
|
||||
, cluster(cluster_)
|
||||
, insert_sync(insert_sync_)
|
||||
, allow_materialized(context->getSettingsRef().insert_allow_materialized_columns)
|
||||
, insert_timeout(insert_timeout_)
|
||||
, main_table(main_table_)
|
||||
, columns_to_send(columns_to_send_.begin(), columns_to_send_.end())
|
||||
, log(&Poco::Logger::get("DistributedBlockOutputStream"))
|
||||
{
|
||||
const auto & settings = context->getSettingsRef();
|
||||
@ -128,27 +144,25 @@ void DistributedSink::consume(Chunk chunk)
|
||||
|
||||
auto ordinary_block = getHeader().cloneWithColumns(chunk.detachColumns());
|
||||
|
||||
if (!allow_materialized)
|
||||
{
|
||||
/* They are added by the AddingDefaultBlockOutputStream, and we will get
|
||||
* different number of columns eventually */
|
||||
for (const auto & col : metadata_snapshot->getColumns().getMaterialized())
|
||||
{
|
||||
if (ordinary_block.has(col.name))
|
||||
{
|
||||
ordinary_block.erase(col.name);
|
||||
LOG_DEBUG(log, "{}: column {} will be removed, because it is MATERIALIZED",
|
||||
storage.getStorageID().getNameForLogs(), col.name);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (insert_sync)
|
||||
writeSync(ordinary_block);
|
||||
else
|
||||
writeAsync(ordinary_block);
|
||||
}
|
||||
|
||||
|
||||
Block DistributedSink::removeSuperfluousColumns(Block block) const
|
||||
{
|
||||
for (size_t i = block.columns(); i;)
|
||||
{
|
||||
--i;
|
||||
if (!columns_to_send.contains(block.getByPosition(i).name))
|
||||
block.erase(i);
|
||||
}
|
||||
return block;
|
||||
}
|
||||
|
||||
|
||||
void DistributedSink::writeAsync(const Block & block)
|
||||
{
|
||||
if (random_shard_insert)
|
||||
@ -403,6 +417,8 @@ void DistributedSink::writeSync(const Block & block)
|
||||
{
|
||||
const Settings & settings = context->getSettingsRef();
|
||||
const auto & shards_info = cluster->getShardsInfo();
|
||||
Block block_to_send = removeSuperfluousColumns(block);
|
||||
|
||||
size_t start = 0;
|
||||
size_t end = shards_info.size();
|
||||
|
||||
@ -415,7 +431,7 @@ void DistributedSink::writeSync(const Block & block)
|
||||
if (!pool)
|
||||
{
|
||||
/// Deferred initialization. Only for sync insertion.
|
||||
initWritingJobs(block, start, end);
|
||||
initWritingJobs(block_to_send, start, end);
|
||||
|
||||
size_t jobs_count = random_shard_insert ? 1 : (remote_jobs_count + local_jobs_count);
|
||||
size_t max_threads = std::min<size_t>(settings.max_distributed_connections, jobs_count);
|
||||
@ -460,7 +476,7 @@ void DistributedSink::writeSync(const Block & block)
|
||||
finished_jobs_count = 0;
|
||||
for (size_t shard_index : collections::range(start, end))
|
||||
for (JobReplica & job : per_shard_jobs[shard_index].replicas_jobs)
|
||||
pool->scheduleOrThrowOnError(runWritingJob(job, block, num_shards));
|
||||
pool->scheduleOrThrowOnError(runWritingJob(job, block_to_send, num_shards));
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
@ -585,12 +601,13 @@ void DistributedSink::writeAsyncImpl(const Block & block, size_t shard_id)
|
||||
{
|
||||
const auto & shard_info = cluster->getShardsInfo()[shard_id];
|
||||
const auto & settings = context->getSettingsRef();
|
||||
Block block_to_send = removeSuperfluousColumns(block);
|
||||
|
||||
if (shard_info.hasInternalReplication())
|
||||
{
|
||||
if (shard_info.isLocal() && settings.prefer_localhost_replica)
|
||||
/// Prefer insert into current instance directly
|
||||
writeToLocal(block, shard_info.getLocalNodeCount());
|
||||
writeToLocal(block_to_send, shard_info.getLocalNodeCount());
|
||||
else
|
||||
{
|
||||
const auto & path = shard_info.insertPathForInternalReplication(
|
||||
@ -598,13 +615,13 @@ void DistributedSink::writeAsyncImpl(const Block & block, size_t shard_id)
|
||||
settings.use_compact_format_in_distributed_parts_names);
|
||||
if (path.empty())
|
||||
throw Exception("Directory name for async inserts is empty", ErrorCodes::LOGICAL_ERROR);
|
||||
writeToShard(block, {path});
|
||||
writeToShard(block_to_send, {path});
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
if (shard_info.isLocal() && settings.prefer_localhost_replica)
|
||||
writeToLocal(block, shard_info.getLocalNodeCount());
|
||||
writeToLocal(block_to_send, shard_info.getLocalNodeCount());
|
||||
|
||||
std::vector<std::string> dir_names;
|
||||
for (const auto & address : cluster->getShardsAddresses()[shard_id])
|
||||
@ -612,7 +629,7 @@ void DistributedSink::writeAsyncImpl(const Block & block, size_t shard_id)
|
||||
dir_names.push_back(address.toFullString(settings.use_compact_format_in_distributed_parts_names));
|
||||
|
||||
if (!dir_names.empty())
|
||||
writeToShard(block, dir_names);
|
||||
writeToShard(block_to_send, dir_names);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -43,11 +43,11 @@ public:
|
||||
ContextPtr context_,
|
||||
StorageDistributed & storage_,
|
||||
const StorageMetadataPtr & metadata_snapshot_,
|
||||
const ASTPtr & query_ast_,
|
||||
const ClusterPtr & cluster_,
|
||||
bool insert_sync_,
|
||||
UInt64 insert_timeout_,
|
||||
StorageID main_table_);
|
||||
StorageID main_table_,
|
||||
const Names & columns_to_send_);
|
||||
|
||||
String getName() const override { return "DistributedSink"; }
|
||||
void consume(Chunk chunk) override;
|
||||
@ -65,6 +65,9 @@ private:
|
||||
|
||||
void writeAsyncImpl(const Block & block, size_t shard_id = 0);
|
||||
|
||||
/// Removes columns which should not be sent to shards.
|
||||
Block removeSuperfluousColumns(Block block) const;
|
||||
|
||||
/// Increments finished_writings_count after each repeat.
|
||||
void writeToLocal(const Block & block, size_t repeats);
|
||||
|
||||
@ -84,7 +87,9 @@ private:
|
||||
/// Returns the number of blocks was written for each cluster node. Uses during exception handling.
|
||||
std::string getCurrentStateDescription();
|
||||
|
||||
/// Context used for writing to remote tables.
|
||||
ContextMutablePtr context;
|
||||
|
||||
StorageDistributed & storage;
|
||||
StorageMetadataPtr metadata_snapshot;
|
||||
ASTPtr query_ast;
|
||||
@ -102,6 +107,7 @@ private:
|
||||
/// Sync-related stuff
|
||||
UInt64 insert_timeout; // in seconds
|
||||
StorageID main_table;
|
||||
NameSet columns_to_send;
|
||||
Stopwatch watch;
|
||||
Stopwatch watch_current_block;
|
||||
std::optional<ThreadPool> pool;
|
||||
|
@ -159,23 +159,6 @@ ASTPtr rewriteSelectQuery(const ASTPtr & query, const std::string & database, co
|
||||
return modified_query_ast;
|
||||
}
|
||||
|
||||
/// The columns list in the original INSERT query is incorrect because inserted blocks are transformed
|
||||
/// to the form of the sample block of the Distributed table. So we rewrite it and add all columns from
|
||||
/// the sample block instead.
|
||||
ASTPtr createInsertToRemoteTableQuery(const std::string & database, const std::string & table, const Block & sample_block)
|
||||
{
|
||||
auto query = std::make_shared<ASTInsertQuery>();
|
||||
query->table_id = StorageID(database, table);
|
||||
|
||||
auto columns = std::make_shared<ASTExpressionList>();
|
||||
query->columns = columns;
|
||||
query->children.push_back(columns);
|
||||
for (const auto & col : sample_block)
|
||||
columns->children.push_back(std::make_shared<ASTIdentifier>(col.name));
|
||||
|
||||
return query;
|
||||
}
|
||||
|
||||
/// Calculate maximum number in file names in directory and all subdirectories.
|
||||
/// To ensure global order of data blocks yet to be sent across server restarts.
|
||||
UInt64 getMaximumFileNumber(const std::string & dir_path)
|
||||
@ -682,17 +665,16 @@ SinkToStoragePtr StorageDistributed::write(const ASTPtr &, const StorageMetadata
|
||||
bool insert_sync = settings.insert_distributed_sync || settings.insert_shard_id || owned_cluster;
|
||||
auto timeout = settings.insert_distributed_timeout;
|
||||
|
||||
Block sample_block;
|
||||
if (!settings.insert_allow_materialized_columns)
|
||||
sample_block = metadata_snapshot->getSampleBlockNonMaterialized();
|
||||
Names columns_to_send;
|
||||
if (settings.insert_allow_materialized_columns)
|
||||
columns_to_send = metadata_snapshot->getSampleBlock().getNames();
|
||||
else
|
||||
sample_block = metadata_snapshot->getSampleBlock();
|
||||
columns_to_send = metadata_snapshot->getSampleBlockNonMaterialized().getNames();
|
||||
|
||||
/// DistributedBlockOutputStream will not own cluster, but will own ConnectionPools of the cluster
|
||||
return std::make_shared<DistributedSink>(
|
||||
local_context, *this, metadata_snapshot,
|
||||
createInsertToRemoteTableQuery(remote_database, remote_table, sample_block),
|
||||
cluster, insert_sync, timeout, StorageID{remote_database, remote_table});
|
||||
local_context, *this, metadata_snapshot, cluster, insert_sync, timeout,
|
||||
StorageID{remote_database, remote_table}, columns_to_send);
|
||||
}
|
||||
|
||||
|
||||
|
@ -0,0 +1,18 @@
|
||||
<yandex>
|
||||
<remote_servers>
|
||||
<test_cluster>
|
||||
<shard>
|
||||
<replica>
|
||||
<host>node1</host>
|
||||
<port>9000</port>
|
||||
</replica>
|
||||
</shard>
|
||||
<shard>
|
||||
<replica>
|
||||
<host>node2</host>
|
||||
<port>9000</port>
|
||||
</replica>
|
||||
</shard>
|
||||
</test_cluster>
|
||||
</remote_servers>
|
||||
</yandex>
|
116
tests/integration/test_sharding_key_from_default_column/test.py
Normal file
116
tests/integration/test_sharding_key_from_default_column/test.py
Normal file
@ -0,0 +1,116 @@
|
||||
import pytest
|
||||
import itertools
|
||||
from helpers.cluster import ClickHouseCluster
|
||||
from helpers.test_tools import TSV
|
||||
|
||||
cluster = ClickHouseCluster(__file__)
|
||||
|
||||
node1 = cluster.add_instance('node1', main_configs=['configs/test_cluster.xml'], with_zookeeper=True)
|
||||
node2 = cluster.add_instance('node2', main_configs=['configs/test_cluster.xml'], with_zookeeper=True)
|
||||
|
||||
|
||||
@pytest.fixture(scope="module", autouse=True)
|
||||
def start_cluster():
|
||||
try:
|
||||
cluster.start()
|
||||
yield cluster
|
||||
finally:
|
||||
cluster.shutdown()
|
||||
|
||||
|
||||
@pytest.fixture(autouse=True)
|
||||
def cleanup_after_test():
|
||||
try:
|
||||
yield
|
||||
finally:
|
||||
node1.query("DROP TABLE IF EXISTS dist ON CLUSTER 'test_cluster'")
|
||||
node1.query("DROP TABLE IF EXISTS local ON CLUSTER 'test_cluster'")
|
||||
|
||||
|
||||
# A default column is used in the sharding key expression.
|
||||
def test_default_column():
|
||||
node1.query("CREATE TABLE dist ON CLUSTER 'test_cluster' (x Int32, y Int32 DEFAULT x + 100, z Int32 DEFAULT x + y) ENGINE = Distributed('test_cluster', currentDatabase(), local, y)")
|
||||
node1.query("CREATE TABLE local ON CLUSTER 'test_cluster' (x Int32, y Int32 DEFAULT x + 200, z Int32 DEFAULT x - y) ENGINE = MergeTree() ORDER BY y")
|
||||
|
||||
for insert_sync in [0, 1]:
|
||||
settings = {'insert_distributed_sync': insert_sync}
|
||||
|
||||
# INSERT INTO TABLE dist (x)
|
||||
node1.query("TRUNCATE TABLE local ON CLUSTER 'test_cluster'")
|
||||
node1.query("INSERT INTO TABLE dist (x) VALUES (1), (2), (3), (4)", settings=settings)
|
||||
node1.query("SYSTEM FLUSH DISTRIBUTED dist")
|
||||
assert node1.query("SELECT x, y, z FROM local") == TSV([[2, 102, 104], [4, 104, 108]])
|
||||
assert node2.query("SELECT x, y, z FROM local") == TSV([[1, 101, 102], [3, 103, 106]])
|
||||
assert node1.query("SELECT x, y, z FROM dist") == TSV([[2, 102, 104], [4, 104, 108], [1, 101, 102], [3, 103, 106]])
|
||||
|
||||
# INSERT INTO TABLE dist (x, y)
|
||||
node1.query("TRUNCATE TABLE local ON CLUSTER 'test_cluster'")
|
||||
node1.query("INSERT INTO TABLE dist (x, y) VALUES (1, 11), (2, 22), (3, 33)", settings=settings)
|
||||
node1.query("SYSTEM FLUSH DISTRIBUTED dist")
|
||||
assert node1.query("SELECT x, y, z FROM local") == TSV([[2, 22, 24]])
|
||||
assert node2.query("SELECT x, y, z FROM local") == TSV([[1, 11, 12], [3, 33, 36]])
|
||||
assert node1.query("SELECT x, y, z FROM dist") == TSV([[2, 22, 24], [1, 11, 12], [3, 33, 36]])
|
||||
|
||||
|
||||
# A materialized column is used in the sharding key expression and `insert_allow_materialized_columns` set to 1.
|
||||
def test_materialized_column_allow_insert_materialized():
|
||||
node1.query("CREATE TABLE dist ON CLUSTER 'test_cluster' (x Int32, y Int32 MATERIALIZED x + 100, z Int32 MATERIALIZED x + y) ENGINE = Distributed('test_cluster', currentDatabase(), local, y)")
|
||||
node1.query("CREATE TABLE local ON CLUSTER 'test_cluster' (x Int32, y Int32 MATERIALIZED x + 200, z Int32 MATERIALIZED x - y) ENGINE = MergeTree() ORDER BY y")
|
||||
|
||||
for insert_sync in [0, 1]:
|
||||
settings = {'insert_distributed_sync': insert_sync, 'insert_allow_materialized_columns': 1}
|
||||
|
||||
# INSERT INTO TABLE dist (x)
|
||||
node1.query("TRUNCATE TABLE local ON CLUSTER 'test_cluster'")
|
||||
node1.query("INSERT INTO TABLE dist (x) VALUES (1), (2), (3), (4)", settings=settings)
|
||||
node1.query("SYSTEM FLUSH DISTRIBUTED dist")
|
||||
assert node1.query("SELECT x, y, z FROM local") == TSV([[2, 102, 104], [4, 104, 108]])
|
||||
assert node2.query("SELECT x, y, z FROM local") == TSV([[1, 101, 102], [3, 103, 106]])
|
||||
assert node1.query("SELECT x, y, z FROM dist") == TSV([[2, 102, 104], [4, 104, 108], [1, 101, 102], [3, 103, 106]])
|
||||
|
||||
# INSERT INTO TABLE dist (x, y)
|
||||
node1.query("TRUNCATE TABLE local ON CLUSTER 'test_cluster'")
|
||||
node1.query("INSERT INTO TABLE dist (x, y) VALUES (1, 11), (2, 22), (3, 33)", settings=settings)
|
||||
node1.query("SYSTEM FLUSH DISTRIBUTED dist")
|
||||
assert node1.query("SELECT x, y, z FROM local") == TSV([[2, 22, 24]])
|
||||
assert node2.query("SELECT x, y, z FROM local") == TSV([[1, 11, 12], [3, 33, 36]])
|
||||
assert node1.query("SELECT x, y, z FROM dist") == TSV([[2, 22, 24], [1, 11, 12], [3, 33, 36]])
|
||||
|
||||
|
||||
# A materialized column is used in the sharding key expression and `insert_allow_materialized_columns` set to 0.
|
||||
def test_materialized_column_disallow_insert_materialized():
|
||||
node1.query("CREATE TABLE dist ON CLUSTER 'test_cluster' (x Int32, y Int32 MATERIALIZED x + 100, z Int32 MATERIALIZED x + y) ENGINE = Distributed('test_cluster', currentDatabase(), local, y)")
|
||||
node1.query("CREATE TABLE local ON CLUSTER 'test_cluster' (x Int32, y Int32 MATERIALIZED x + 200, z Int32 MATERIALIZED x - y) ENGINE = MergeTree() ORDER BY y")
|
||||
|
||||
for insert_sync in [0, 1]:
|
||||
settings = {'insert_distributed_sync': insert_sync, 'insert_allow_materialized_columns': 0}
|
||||
|
||||
# INSERT INTO TABLE dist (x)
|
||||
node1.query("TRUNCATE TABLE local ON CLUSTER 'test_cluster'")
|
||||
node1.query("INSERT INTO TABLE dist (x) VALUES (1), (2), (3), (4)", settings=settings)
|
||||
node1.query("SYSTEM FLUSH DISTRIBUTED dist")
|
||||
assert node1.query("SELECT x, y, z FROM local") == TSV([[2, 202, -200], [4, 204, -200]])
|
||||
assert node2.query("SELECT x, y, z FROM local") == TSV([[1, 201, -200], [3, 203, -200]])
|
||||
assert node1.query("SELECT x, y, z FROM dist") == TSV([[2, 202, -200], [4, 204, -200], [1, 201, -200], [3, 203, -200]])
|
||||
|
||||
# INSERT INTO TABLE dist (x, y)
|
||||
node1.query("TRUNCATE TABLE local ON CLUSTER 'test_cluster'")
|
||||
expected_error = "Cannot insert column y, because it is MATERIALIZED column"
|
||||
assert expected_error in node1.query_and_get_error("INSERT INTO TABLE dist (x, y) VALUES (1, 11), (2, 22), (3, 33)", settings=settings)
|
||||
|
||||
|
||||
# Almost the same as the previous test `test_materialized_column_disallow_insert_materialized`, but the sharding key has different values.
|
||||
def test_materialized_column_disallow_insert_materialized_different_shards():
|
||||
node1.query("CREATE TABLE dist ON CLUSTER 'test_cluster' (x Int32, y Int32 MATERIALIZED x + 101, z Int32 MATERIALIZED x + y) ENGINE = Distributed('test_cluster', currentDatabase(), local, y)")
|
||||
node1.query("CREATE TABLE local ON CLUSTER 'test_cluster' (x Int32, y Int32 MATERIALIZED x + 200, z Int32 MATERIALIZED x - y) ENGINE = MergeTree() ORDER BY y")
|
||||
|
||||
for insert_sync in [0, 1]:
|
||||
settings = {'insert_distributed_sync': insert_sync, 'insert_allow_materialized_columns': 0}
|
||||
|
||||
# INSERT INTO TABLE dist (x)
|
||||
node1.query("TRUNCATE TABLE local ON CLUSTER 'test_cluster'")
|
||||
node1.query("INSERT INTO TABLE dist (x) VALUES (1), (2), (3), (4)", settings=settings)
|
||||
node1.query("SYSTEM FLUSH DISTRIBUTED dist")
|
||||
assert node1.query("SELECT x, y, z FROM local") == TSV([[1, 201, -200], [3, 203, -200]])
|
||||
assert node2.query("SELECT x, y, z FROM local") == TSV([[2, 202, -200], [4, 204, -200]])
|
||||
assert node1.query("SELECT x, y, z FROM dist") == TSV([[1, 201, -200], [3, 203, -200], [2, 202, -200], [4, 204, -200]])
|
Loading…
Reference in New Issue
Block a user