Merge pull request #61320 from ClickHouse/vdimir/fix_rabbitmq_logical_error

Fix logical error in RabbitMQ storage with MATERIALIZED columns
This commit is contained in:
vdimir 2024-03-14 13:28:05 +01:00 committed by GitHub
commit 0b97f3ac16
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 106 additions and 90 deletions

View File

@ -18,8 +18,8 @@ This engine allows integrating ClickHouse with [RabbitMQ](https://www.rabbitmq.c
``` sql ``` sql
CREATE TABLE [IF NOT EXISTS] [db.]table_name [ON CLUSTER cluster] CREATE TABLE [IF NOT EXISTS] [db.]table_name [ON CLUSTER cluster]
( (
name1 [type1] [DEFAULT|MATERIALIZED|ALIAS expr1], name1 [type1],
name2 [type2] [DEFAULT|MATERIALIZED|ALIAS expr2], name2 [type2],
... ...
) ENGINE = RabbitMQ SETTINGS ) ENGINE = RabbitMQ SETTINGS
rabbitmq_host_port = 'host:port' [or rabbitmq_address = 'amqp(s)://guest:guest@localhost/vhost'], rabbitmq_host_port = 'host:port' [or rabbitmq_address = 'amqp(s)://guest:guest@localhost/vhost'],
@ -198,6 +198,10 @@ Additional virtual columns when `kafka_handle_error_mode='stream'`:
Note: `_raw_message` and `_error` virtual columns are filled only in case of exception during parsing, they are always `NULL` when message was parsed successfully. Note: `_raw_message` and `_error` virtual columns are filled only in case of exception during parsing, they are always `NULL` when message was parsed successfully.
## Caveats {#caveats}
Even though you may specify [default column expressions](/docs/en/sql-reference/statements/create/table.md/#default_values) (such as `DEFAULT`, `MATERIALIZED`, `ALIAS`) in the table definition, these will be ignored. Instead, the columns will be filled with their respective default values for their types.
## Data formats support {#data-formats-support} ## Data formats support {#data-formats-support}
RabbitMQ engine supports all [formats](../../../interfaces/formats.md) supported in ClickHouse. RabbitMQ engine supports all [formats](../../../interfaces/formats.md) supported in ClickHouse.

View File

@ -151,7 +151,7 @@ Block InterpreterInsertQuery::getSampleBlock(
names.emplace_back(std::move(current_name)); names.emplace_back(std::move(current_name));
} }
return getSampleBlock(names, table, metadata_snapshot, allow_materialized); return getSampleBlockImpl(names, table, metadata_snapshot, no_destination, allow_materialized);
} }
std::optional<Names> InterpreterInsertQuery::getInsertColumnNames() const std::optional<Names> InterpreterInsertQuery::getInsertColumnNames() const
@ -173,13 +173,18 @@ std::optional<Names> InterpreterInsertQuery::getInsertColumnNames() const
return names; return names;
} }
Block InterpreterInsertQuery::getSampleBlock( Block InterpreterInsertQuery::getSampleBlockImpl(
const Names & names, const Names & names,
const StoragePtr & table, const StoragePtr & table,
const StorageMetadataPtr & metadata_snapshot, const StorageMetadataPtr & metadata_snapshot,
bool no_destination,
bool allow_materialized) bool allow_materialized)
{ {
Block table_sample_physical = metadata_snapshot->getSampleBlock(); Block table_sample_physical = metadata_snapshot->getSampleBlock();
Block table_sample_virtuals;
if (no_destination)
table_sample_virtuals = table->getVirtualsHeader();
Block table_sample_insertable = metadata_snapshot->getSampleBlockInsertable(); Block table_sample_insertable = metadata_snapshot->getSampleBlockInsertable();
Block res; Block res;
for (const auto & current_name : names) for (const auto & current_name : names)
@ -194,13 +199,19 @@ Block InterpreterInsertQuery::getSampleBlock(
if (table_sample_physical.has(current_name)) if (table_sample_physical.has(current_name))
{ {
if (!allow_materialized) if (!allow_materialized)
throw Exception(ErrorCodes::ILLEGAL_COLUMN, "Cannot insert column {}, because it is MATERIALIZED column.", throw Exception(ErrorCodes::ILLEGAL_COLUMN, "Cannot insert column {}, because it is MATERIALIZED column", current_name);
current_name);
res.insert(ColumnWithTypeAndName(table_sample_physical.getByName(current_name).type, current_name)); res.insert(ColumnWithTypeAndName(table_sample_physical.getByName(current_name).type, current_name));
} }
else /// The table does not have a column with that name else if (table_sample_virtuals.has(current_name))
{
res.insert(ColumnWithTypeAndName(table_sample_virtuals.getByName(current_name).type, current_name));
}
else
{
/// The table does not have a column with that name
throw Exception(ErrorCodes::NO_SUCH_COLUMN_IN_TABLE, "No such column {} in table {}", throw Exception(ErrorCodes::NO_SUCH_COLUMN_IN_TABLE, "No such column {} in table {}",
current_name, table->getStorageID().getNameForLogs()); current_name, table->getStorageID().getNameForLogs());
}
} }
else else
res.insert(ColumnWithTypeAndName(table_sample_insertable.getByName(current_name).type, current_name)); res.insert(ColumnWithTypeAndName(table_sample_insertable.getByName(current_name).type, current_name));
@ -276,7 +287,7 @@ Chain InterpreterInsertQuery::buildChain(
if (!running_group) if (!running_group)
running_group = std::make_shared<ThreadGroup>(getContext()); running_group = std::make_shared<ThreadGroup>(getContext());
auto sample = getSampleBlock(columns, table, metadata_snapshot, allow_materialized); auto sample = getSampleBlockImpl(columns, table, metadata_snapshot, no_destination, allow_materialized);
if (check_access) if (check_access)
getContext()->checkAccess(AccessType::INSERT, table->getStorageID(), sample.getNames()); getContext()->checkAccess(AccessType::INSERT, table->getStorageID(), sample.getNames());

View File

@ -69,7 +69,7 @@ public:
bool shouldAddSquashingFroStorage(const StoragePtr & table) const; bool shouldAddSquashingFroStorage(const StoragePtr & table) const;
private: private:
static Block getSampleBlock(const Names & names, const StoragePtr & table, const StorageMetadataPtr & metadata_snapshot, bool allow_materialized); static Block getSampleBlockImpl(const Names & names, const StoragePtr & table, const StorageMetadataPtr & metadata_snapshot, bool no_destination, bool allow_materialized);
ASTPtr query_ptr; ASTPtr query_ptr;
const bool allow_materialized; const bool allow_materialized;

View File

@ -11,11 +11,21 @@
namespace DB namespace DB
{ {
static std::pair<Block, Block> getHeaders(const StorageSnapshotPtr & storage_snapshot) static std::pair<Block, Block> getHeaders(const StorageSnapshotPtr & storage_snapshot, const Names & column_names)
{ {
auto all_columns_header = storage_snapshot->metadata->getSampleBlock();
auto non_virtual_header = storage_snapshot->metadata->getSampleBlockNonMaterialized(); auto non_virtual_header = storage_snapshot->metadata->getSampleBlockNonMaterialized();
auto virtual_header = storage_snapshot->virtual_columns->getSampleBlock(); auto virtual_header = storage_snapshot->virtual_columns->getSampleBlock();
for (const auto & column_name : column_names)
{
if (non_virtual_header.has(column_name) || virtual_header.has(column_name))
continue;
const auto & column = all_columns_header.getByName(column_name);
non_virtual_header.insert(column);
}
return {non_virtual_header, virtual_header}; return {non_virtual_header, virtual_header};
} }
@ -40,7 +50,7 @@ RabbitMQSource::RabbitMQSource(
: RabbitMQSource( : RabbitMQSource(
storage_, storage_,
storage_snapshot_, storage_snapshot_,
getHeaders(storage_snapshot_), getHeaders(storage_snapshot_, columns),
context_, context_,
columns, columns,
max_block_size_, max_block_size_,

View File

@ -8,6 +8,7 @@
#include <Parsers/ASTCreateQuery.h> #include <Parsers/ASTCreateQuery.h>
#include <Parsers/ASTExpressionList.h> #include <Parsers/ASTExpressionList.h>
#include <Parsers/ASTInsertQuery.h> #include <Parsers/ASTInsertQuery.h>
#include <Parsers/ASTIdentifier.h>
#include <Processors/Executors/CompletedPipelineExecutor.h> #include <Processors/Executors/CompletedPipelineExecutor.h>
#include <Processors/Executors/PushingPipelineExecutor.h> #include <Processors/Executors/PushingPipelineExecutor.h>
#include <Processors/Transforms/ExpressionTransform.h> #include <Processors/Transforms/ExpressionTransform.h>
@ -133,6 +134,9 @@ StorageRabbitMQ::StorageRabbitMQ(
if (configuration.secure) if (configuration.secure)
SSL_library_init(); SSL_library_init();
if (!columns_.getMaterialized().empty() || !columns_.getAliases().empty() || !columns_.getDefaults().empty() || !columns_.getEphemeral().empty())
context_->addWarningMessage("RabbitMQ table engine doesn't support ALIAS, DEFAULT or MATERIALIZED columns. They will be ignored and filled with default values");
StorageInMemoryMetadata storage_metadata; StorageInMemoryMetadata storage_metadata;
storage_metadata.setColumns(columns_); storage_metadata.setColumns(columns_);
setInMemoryMetadata(storage_metadata); setInMemoryMetadata(storage_metadata);
@ -1055,18 +1059,7 @@ bool StorageRabbitMQ::tryStreamToViews()
if (!table) if (!table)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Engine table {} doesn't exist.", table_id.getNameForLogs()); throw Exception(ErrorCodes::LOGICAL_ERROR, "Engine table {} doesn't exist.", table_id.getNameForLogs());
// Create an INSERT query for streaming data
auto insert = std::make_shared<ASTInsertQuery>();
insert->table_id = table_id;
// Only insert into dependent views and expect that input blocks contain virtual columns
InterpreterInsertQuery interpreter(insert, rabbitmq_context, false, true, true);
auto block_io = interpreter.execute();
auto storage_snapshot = getStorageSnapshot(getInMemoryMetadataPtr(), getContext()); auto storage_snapshot = getStorageSnapshot(getInMemoryMetadataPtr(), getContext());
auto column_names = block_io.pipeline.getHeader().getNames();
auto sample_block = storage_snapshot->getSampleBlockForColumns(column_names);
auto block_size = getMaxBlockSize(); auto block_size = getMaxBlockSize();
// Create a stream for each consumer and join them in a union stream // Create a stream for each consumer and join them in a union stream
@ -1082,13 +1075,29 @@ bool StorageRabbitMQ::tryStreamToViews()
for (size_t i = 0; i < num_created_consumers; ++i) for (size_t i = 0; i < num_created_consumers; ++i)
{ {
auto source = std::make_shared<RabbitMQSource>( auto source = std::make_shared<RabbitMQSource>(
*this, storage_snapshot, rabbitmq_context, column_names, block_size, *this, storage_snapshot, rabbitmq_context, Names{}, block_size,
max_execution_time_ms, rabbitmq_settings->rabbitmq_handle_error_mode, false); max_execution_time_ms, rabbitmq_settings->rabbitmq_handle_error_mode);
sources.emplace_back(source); sources.emplace_back(source);
pipes.emplace_back(source); pipes.emplace_back(source);
} }
// Create an INSERT query for streaming data
auto insert = std::make_shared<ASTInsertQuery>();
insert->table_id = table_id;
if (!sources.empty())
{
auto column_list = std::make_shared<ASTExpressionList>();
const auto & header = sources[0]->getPort().getHeader();
for (const auto & column : header)
column_list->children.emplace_back(std::make_shared<ASTIdentifier>(column.name));
insert->columns = std::move(column_list);
}
// Only insert into dependent views and expect that input blocks contain virtual columns
InterpreterInsertQuery interpreter(insert, rabbitmq_context, /* allow_materialized_ */ false, /* no_squash_ */ true, /* no_destination_ */ true);
auto block_io = interpreter.execute();
block_io.pipeline.complete(Pipe::unitePipes(std::move(pipes))); block_io.pipeline.complete(Pipe::unitePipes(std::move(pipes)));
std::atomic_size_t rows = 0; std::atomic_size_t rows = 0;

View File

@ -53,13 +53,13 @@ instance3 = cluster.add_instance(
# Helpers # Helpers
def rabbitmq_check_result(result, check=False, ref_file="test_rabbitmq_json.reference"): def rabbitmq_check_result(result, check=False, reference=None):
fpath = p.join(p.dirname(__file__), ref_file) if reference is None:
with open(fpath) as reference: reference = "\n".join([f"{i}\t{i}" for i in range(50)])
if check: if check:
assert TSV(result) == TSV(reference) assert TSV(result) == TSV(reference)
else: else:
return TSV(result) == TSV(reference) return TSV(result) == TSV(reference)
def wait_rabbitmq_to_start(rabbitmq_docker_id, cookie, timeout=180): def wait_rabbitmq_to_start(rabbitmq_docker_id, cookie, timeout=180):
@ -133,9 +133,10 @@ def test_rabbitmq_select(rabbitmq_cluster, secure):
if secure: if secure:
port = cluster.rabbitmq_secure_port port = cluster.rabbitmq_secure_port
# MATERIALIZED and ALIAS columns are not supported in RabbitMQ engine, but we can test that it does not fail
instance.query( instance.query(
""" """
CREATE TABLE test.rabbitmq (key UInt64, value UInt64) CREATE TABLE test.rabbitmq (key UInt64, value UInt64, value2 ALIAS value + 1, value3 MATERIALIZED value + 1)
ENGINE = RabbitMQ ENGINE = RabbitMQ
SETTINGS rabbitmq_host_port = '{}:{}', SETTINGS rabbitmq_host_port = '{}:{}',
rabbitmq_exchange_name = 'select', rabbitmq_exchange_name = 'select',
@ -148,6 +149,11 @@ def test_rabbitmq_select(rabbitmq_cluster, secure):
) )
) )
assert (
"RabbitMQ table engine doesn\\'t support ALIAS, DEFAULT or MATERIALIZED columns"
in instance.query("SELECT * FROM system.warnings")
)
credentials = pika.PlainCredentials("root", "clickhouse") credentials = pika.PlainCredentials("root", "clickhouse")
parameters = pika.ConnectionParameters( parameters = pika.ConnectionParameters(
rabbitmq_cluster.rabbitmq_ip, rabbitmq_cluster.rabbitmq_port, "/", credentials rabbitmq_cluster.rabbitmq_ip, rabbitmq_cluster.rabbitmq_port, "/", credentials
@ -379,7 +385,7 @@ def test_rabbitmq_macros(rabbitmq_cluster):
def test_rabbitmq_materialized_view(rabbitmq_cluster): def test_rabbitmq_materialized_view(rabbitmq_cluster):
instance.query( instance.query(
""" """
CREATE TABLE test.rabbitmq (key UInt64, value UInt64) CREATE TABLE test.rabbitmq (key UInt64, value UInt64, dt1 DateTime MATERIALIZED now(), value2 ALIAS value + 1)
ENGINE = RabbitMQ ENGINE = RabbitMQ
SETTINGS rabbitmq_host_port = 'rabbitmq1:5672', SETTINGS rabbitmq_host_port = 'rabbitmq1:5672',
rabbitmq_exchange_name = 'mv', rabbitmq_exchange_name = 'mv',
@ -484,9 +490,11 @@ def test_rabbitmq_many_materialized_views(rabbitmq_cluster):
""" """
DROP TABLE IF EXISTS test.view1; DROP TABLE IF EXISTS test.view1;
DROP TABLE IF EXISTS test.view2; DROP TABLE IF EXISTS test.view2;
DROP TABLE IF EXISTS test.view3;
DROP TABLE IF EXISTS test.consumer1; DROP TABLE IF EXISTS test.consumer1;
DROP TABLE IF EXISTS test.consumer2; DROP TABLE IF EXISTS test.consumer2;
CREATE TABLE test.rabbitmq (key UInt64, value UInt64) DROP TABLE IF EXISTS test.consumer3;
CREATE TABLE test.rabbitmq (key UInt64, value UInt64, value2 ALIAS value + 1, value3 MATERIALIZED value + 1, value4 DEFAULT 1)
ENGINE = RabbitMQ ENGINE = RabbitMQ
SETTINGS rabbitmq_host_port = 'rabbitmq1:5672', SETTINGS rabbitmq_host_port = 'rabbitmq1:5672',
rabbitmq_exchange_name = 'mmv', rabbitmq_exchange_name = 'mmv',
@ -497,13 +505,18 @@ def test_rabbitmq_many_materialized_views(rabbitmq_cluster):
CREATE TABLE test.view1 (key UInt64, value UInt64) CREATE TABLE test.view1 (key UInt64, value UInt64)
ENGINE = MergeTree() ENGINE = MergeTree()
ORDER BY key; ORDER BY key;
CREATE TABLE test.view2 (key UInt64, value UInt64) CREATE TABLE test.view2 (key UInt64, value UInt64, value2 UInt64, value3 UInt64, value4 UInt64)
ENGINE = MergeTree()
ORDER BY key;
CREATE TABLE test.view3 (key UInt64)
ENGINE = MergeTree() ENGINE = MergeTree()
ORDER BY key; ORDER BY key;
CREATE MATERIALIZED VIEW test.consumer1 TO test.view1 AS CREATE MATERIALIZED VIEW test.consumer1 TO test.view1 AS
SELECT * FROM test.rabbitmq; SELECT * FROM test.rabbitmq;
CREATE MATERIALIZED VIEW test.consumer2 TO test.view2 AS CREATE MATERIALIZED VIEW test.consumer2 TO test.view2 AS
SELECT * FROM test.rabbitmq; SELECT * FROM test.rabbitmq;
CREATE MATERIALIZED VIEW test.consumer3 TO test.view3 AS
SELECT * FROM test.rabbitmq;
""" """
) )
@ -514,7 +527,7 @@ def test_rabbitmq_many_materialized_views(rabbitmq_cluster):
connection = pika.BlockingConnection(parameters) connection = pika.BlockingConnection(parameters)
channel = connection.channel() channel = connection.channel()
instance.wait_for_log_line("Started streaming to 2 attached views") instance.wait_for_log_line("Started streaming to 3 attached views")
messages = [] messages = []
for i in range(50): for i in range(50):
@ -522,24 +535,43 @@ def test_rabbitmq_many_materialized_views(rabbitmq_cluster):
for message in messages: for message in messages:
channel.basic_publish(exchange="mmv", routing_key="", body=message) channel.basic_publish(exchange="mmv", routing_key="", body=message)
while True: is_check_passed = False
deadline = time.monotonic() + 60
while time.monotonic() < deadline:
result1 = instance.query("SELECT * FROM test.view1 ORDER BY key") result1 = instance.query("SELECT * FROM test.view1 ORDER BY key")
result2 = instance.query("SELECT * FROM test.view2 ORDER BY key") result2 = instance.query("SELECT * FROM test.view2 ORDER BY key")
if rabbitmq_check_result(result1) and rabbitmq_check_result(result2): result3 = instance.query("SELECT * FROM test.view3 ORDER BY key")
# Note that for view2 result is `i i 0 0 0`, but not `i i i+1 i+1 1` as expected, ALIAS/MATERIALIZED/DEFAULT columns are not supported in RabbitMQ engine
# We onlt check that at least it do not fail
if (
rabbitmq_check_result(result1)
and rabbitmq_check_result(
result2, reference="\n".join([f"{i}\t{i}\t0\t0\t0" for i in range(50)])
)
and rabbitmq_check_result(
result3, reference="\n".join([str(i) for i in range(50)])
)
):
is_check_passed = True
break break
time.sleep(0.1)
assert (
is_check_passed
), f"References are not equal to results, result1: {result1}, result2: {result2}, result3: {result3}"
instance.query( instance.query(
""" """
DROP TABLE test.consumer1; DROP TABLE test.consumer1;
DROP TABLE test.consumer2; DROP TABLE test.consumer2;
DROP TABLE test.consumer3;
DROP TABLE test.view1; DROP TABLE test.view1;
DROP TABLE test.view2; DROP TABLE test.view2;
DROP TABLE test.view3;
""" """
) )
connection.close() connection.close()
rabbitmq_check_result(result1, True)
rabbitmq_check_result(result2, True)
def test_rabbitmq_big_message(rabbitmq_cluster): def test_rabbitmq_big_message(rabbitmq_cluster):

View File

@ -1,50 +0,0 @@
0 0
1 1
2 2
3 3
4 4
5 5
6 6
7 7
8 8
9 9
10 10
11 11
12 12
13 13
14 14
15 15
16 16
17 17
18 18
19 19
20 20
21 21
22 22
23 23
24 24
25 25
26 26
27 27
28 28
29 29
30 30
31 31
32 32
33 33
34 34
35 35
36 36
37 37
38 38
39 39
40 40
41 41
42 42
43 43
44 44
45 45
46 46
47 47
48 48
49 49