diff --git a/docs/en/engines/table-engines/integrations/rabbitmq.md b/docs/en/engines/table-engines/integrations/rabbitmq.md index 0f3fef3d6fb..a4d0cf78066 100644 --- a/docs/en/engines/table-engines/integrations/rabbitmq.md +++ b/docs/en/engines/table-engines/integrations/rabbitmq.md @@ -18,8 +18,8 @@ This engine allows integrating ClickHouse with [RabbitMQ](https://www.rabbitmq.c ``` sql CREATE TABLE [IF NOT EXISTS] [db.]table_name [ON CLUSTER cluster] ( - name1 [type1] [DEFAULT|MATERIALIZED|ALIAS expr1], - name2 [type2] [DEFAULT|MATERIALIZED|ALIAS expr2], + name1 [type1], + name2 [type2], ... ) ENGINE = RabbitMQ SETTINGS rabbitmq_host_port = 'host:port' [or rabbitmq_address = 'amqp(s)://guest:guest@localhost/vhost'], @@ -198,6 +198,10 @@ Additional virtual columns when `kafka_handle_error_mode='stream'`: Note: `_raw_message` and `_error` virtual columns are filled only in case of exception during parsing, they are always `NULL` when message was parsed successfully. +## Caveats {#caveats} + +Even though you may specify [default column expressions](/docs/en/sql-reference/statements/create/table.md/#default_values) (such as `DEFAULT`, `MATERIALIZED`, `ALIAS`) in the table definition, these will be ignored. Instead, the columns will be filled with their respective default values for their types. + ## Data formats support {#data-formats-support} RabbitMQ engine supports all [formats](../../../interfaces/formats.md) supported in ClickHouse. diff --git a/src/Interpreters/InterpreterInsertQuery.cpp b/src/Interpreters/InterpreterInsertQuery.cpp index 70f9e0c51da..3e8bb268fe7 100644 --- a/src/Interpreters/InterpreterInsertQuery.cpp +++ b/src/Interpreters/InterpreterInsertQuery.cpp @@ -151,7 +151,7 @@ Block InterpreterInsertQuery::getSampleBlock( names.emplace_back(std::move(current_name)); } - return getSampleBlock(names, table, metadata_snapshot, allow_materialized); + return getSampleBlockImpl(names, table, metadata_snapshot, no_destination, allow_materialized); } std::optional InterpreterInsertQuery::getInsertColumnNames() const @@ -173,13 +173,18 @@ std::optional InterpreterInsertQuery::getInsertColumnNames() const return names; } -Block InterpreterInsertQuery::getSampleBlock( +Block InterpreterInsertQuery::getSampleBlockImpl( const Names & names, const StoragePtr & table, const StorageMetadataPtr & metadata_snapshot, + bool no_destination, bool allow_materialized) { Block table_sample_physical = metadata_snapshot->getSampleBlock(); + Block table_sample_virtuals; + if (no_destination) + table_sample_virtuals = table->getVirtualsHeader(); + Block table_sample_insertable = metadata_snapshot->getSampleBlockInsertable(); Block res; for (const auto & current_name : names) @@ -194,13 +199,19 @@ Block InterpreterInsertQuery::getSampleBlock( if (table_sample_physical.has(current_name)) { if (!allow_materialized) - throw Exception(ErrorCodes::ILLEGAL_COLUMN, "Cannot insert column {}, because it is MATERIALIZED column.", - current_name); + throw Exception(ErrorCodes::ILLEGAL_COLUMN, "Cannot insert column {}, because it is MATERIALIZED column", current_name); res.insert(ColumnWithTypeAndName(table_sample_physical.getByName(current_name).type, current_name)); } - else /// The table does not have a column with that name + else if (table_sample_virtuals.has(current_name)) + { + res.insert(ColumnWithTypeAndName(table_sample_virtuals.getByName(current_name).type, current_name)); + } + else + { + /// The table does not have a column with that name throw Exception(ErrorCodes::NO_SUCH_COLUMN_IN_TABLE, "No such column {} in table {}", current_name, table->getStorageID().getNameForLogs()); + } } else res.insert(ColumnWithTypeAndName(table_sample_insertable.getByName(current_name).type, current_name)); @@ -276,7 +287,7 @@ Chain InterpreterInsertQuery::buildChain( if (!running_group) running_group = std::make_shared(getContext()); - auto sample = getSampleBlock(columns, table, metadata_snapshot, allow_materialized); + auto sample = getSampleBlockImpl(columns, table, metadata_snapshot, no_destination, allow_materialized); if (check_access) getContext()->checkAccess(AccessType::INSERT, table->getStorageID(), sample.getNames()); diff --git a/src/Interpreters/InterpreterInsertQuery.h b/src/Interpreters/InterpreterInsertQuery.h index 3647126afb9..bf73fb2a319 100644 --- a/src/Interpreters/InterpreterInsertQuery.h +++ b/src/Interpreters/InterpreterInsertQuery.h @@ -69,7 +69,7 @@ public: bool shouldAddSquashingFroStorage(const StoragePtr & table) const; private: - static Block getSampleBlock(const Names & names, const StoragePtr & table, const StorageMetadataPtr & metadata_snapshot, bool allow_materialized); + static Block getSampleBlockImpl(const Names & names, const StoragePtr & table, const StorageMetadataPtr & metadata_snapshot, bool no_destination, bool allow_materialized); ASTPtr query_ptr; const bool allow_materialized; diff --git a/src/Storages/RabbitMQ/RabbitMQSource.cpp b/src/Storages/RabbitMQ/RabbitMQSource.cpp index 4dc257074f3..09c1bf1b2e7 100644 --- a/src/Storages/RabbitMQ/RabbitMQSource.cpp +++ b/src/Storages/RabbitMQ/RabbitMQSource.cpp @@ -11,11 +11,21 @@ namespace DB { -static std::pair getHeaders(const StorageSnapshotPtr & storage_snapshot) +static std::pair getHeaders(const StorageSnapshotPtr & storage_snapshot, const Names & column_names) { + auto all_columns_header = storage_snapshot->metadata->getSampleBlock(); + auto non_virtual_header = storage_snapshot->metadata->getSampleBlockNonMaterialized(); auto virtual_header = storage_snapshot->virtual_columns->getSampleBlock(); + for (const auto & column_name : column_names) + { + if (non_virtual_header.has(column_name) || virtual_header.has(column_name)) + continue; + const auto & column = all_columns_header.getByName(column_name); + non_virtual_header.insert(column); + } + return {non_virtual_header, virtual_header}; } @@ -40,7 +50,7 @@ RabbitMQSource::RabbitMQSource( : RabbitMQSource( storage_, storage_snapshot_, - getHeaders(storage_snapshot_), + getHeaders(storage_snapshot_, columns), context_, columns, max_block_size_, diff --git a/src/Storages/RabbitMQ/StorageRabbitMQ.cpp b/src/Storages/RabbitMQ/StorageRabbitMQ.cpp index 980fccd307e..b882fd2728c 100644 --- a/src/Storages/RabbitMQ/StorageRabbitMQ.cpp +++ b/src/Storages/RabbitMQ/StorageRabbitMQ.cpp @@ -8,6 +8,7 @@ #include #include #include +#include #include #include #include @@ -133,6 +134,9 @@ StorageRabbitMQ::StorageRabbitMQ( if (configuration.secure) SSL_library_init(); + if (!columns_.getMaterialized().empty() || !columns_.getAliases().empty() || !columns_.getDefaults().empty() || !columns_.getEphemeral().empty()) + context_->addWarningMessage("RabbitMQ table engine doesn't support ALIAS, DEFAULT or MATERIALIZED columns. They will be ignored and filled with default values"); + StorageInMemoryMetadata storage_metadata; storage_metadata.setColumns(columns_); setInMemoryMetadata(storage_metadata); @@ -1055,18 +1059,7 @@ bool StorageRabbitMQ::tryStreamToViews() if (!table) throw Exception(ErrorCodes::LOGICAL_ERROR, "Engine table {} doesn't exist.", table_id.getNameForLogs()); - // Create an INSERT query for streaming data - auto insert = std::make_shared(); - insert->table_id = table_id; - - // Only insert into dependent views and expect that input blocks contain virtual columns - InterpreterInsertQuery interpreter(insert, rabbitmq_context, false, true, true); - auto block_io = interpreter.execute(); - auto storage_snapshot = getStorageSnapshot(getInMemoryMetadataPtr(), getContext()); - auto column_names = block_io.pipeline.getHeader().getNames(); - auto sample_block = storage_snapshot->getSampleBlockForColumns(column_names); - auto block_size = getMaxBlockSize(); // Create a stream for each consumer and join them in a union stream @@ -1082,13 +1075,29 @@ bool StorageRabbitMQ::tryStreamToViews() for (size_t i = 0; i < num_created_consumers; ++i) { auto source = std::make_shared( - *this, storage_snapshot, rabbitmq_context, column_names, block_size, - max_execution_time_ms, rabbitmq_settings->rabbitmq_handle_error_mode, false); + *this, storage_snapshot, rabbitmq_context, Names{}, block_size, + max_execution_time_ms, rabbitmq_settings->rabbitmq_handle_error_mode); sources.emplace_back(source); pipes.emplace_back(source); } + // Create an INSERT query for streaming data + auto insert = std::make_shared(); + insert->table_id = table_id; + if (!sources.empty()) + { + auto column_list = std::make_shared(); + const auto & header = sources[0]->getPort().getHeader(); + for (const auto & column : header) + column_list->children.emplace_back(std::make_shared(column.name)); + insert->columns = std::move(column_list); + } + + // Only insert into dependent views and expect that input blocks contain virtual columns + InterpreterInsertQuery interpreter(insert, rabbitmq_context, /* allow_materialized_ */ false, /* no_squash_ */ true, /* no_destination_ */ true); + auto block_io = interpreter.execute(); + block_io.pipeline.complete(Pipe::unitePipes(std::move(pipes))); std::atomic_size_t rows = 0; diff --git a/tests/integration/test_storage_rabbitmq/test.py b/tests/integration/test_storage_rabbitmq/test.py index 280ce230921..0f1c5eb17dd 100644 --- a/tests/integration/test_storage_rabbitmq/test.py +++ b/tests/integration/test_storage_rabbitmq/test.py @@ -53,13 +53,13 @@ instance3 = cluster.add_instance( # Helpers -def rabbitmq_check_result(result, check=False, ref_file="test_rabbitmq_json.reference"): - fpath = p.join(p.dirname(__file__), ref_file) - with open(fpath) as reference: - if check: - assert TSV(result) == TSV(reference) - else: - return TSV(result) == TSV(reference) +def rabbitmq_check_result(result, check=False, reference=None): + if reference is None: + reference = "\n".join([f"{i}\t{i}" for i in range(50)]) + if check: + assert TSV(result) == TSV(reference) + else: + return TSV(result) == TSV(reference) def wait_rabbitmq_to_start(rabbitmq_docker_id, cookie, timeout=180): @@ -133,9 +133,10 @@ def test_rabbitmq_select(rabbitmq_cluster, secure): if secure: port = cluster.rabbitmq_secure_port + # MATERIALIZED and ALIAS columns are not supported in RabbitMQ engine, but we can test that it does not fail instance.query( """ - CREATE TABLE test.rabbitmq (key UInt64, value UInt64) + CREATE TABLE test.rabbitmq (key UInt64, value UInt64, value2 ALIAS value + 1, value3 MATERIALIZED value + 1) ENGINE = RabbitMQ SETTINGS rabbitmq_host_port = '{}:{}', rabbitmq_exchange_name = 'select', @@ -148,6 +149,11 @@ def test_rabbitmq_select(rabbitmq_cluster, secure): ) ) + assert ( + "RabbitMQ table engine doesn\\'t support ALIAS, DEFAULT or MATERIALIZED columns" + in instance.query("SELECT * FROM system.warnings") + ) + credentials = pika.PlainCredentials("root", "clickhouse") parameters = pika.ConnectionParameters( rabbitmq_cluster.rabbitmq_ip, rabbitmq_cluster.rabbitmq_port, "/", credentials @@ -379,7 +385,7 @@ def test_rabbitmq_macros(rabbitmq_cluster): def test_rabbitmq_materialized_view(rabbitmq_cluster): instance.query( """ - CREATE TABLE test.rabbitmq (key UInt64, value UInt64) + CREATE TABLE test.rabbitmq (key UInt64, value UInt64, dt1 DateTime MATERIALIZED now(), value2 ALIAS value + 1) ENGINE = RabbitMQ SETTINGS rabbitmq_host_port = 'rabbitmq1:5672', rabbitmq_exchange_name = 'mv', @@ -484,9 +490,11 @@ def test_rabbitmq_many_materialized_views(rabbitmq_cluster): """ DROP TABLE IF EXISTS test.view1; DROP TABLE IF EXISTS test.view2; + DROP TABLE IF EXISTS test.view3; DROP TABLE IF EXISTS test.consumer1; DROP TABLE IF EXISTS test.consumer2; - CREATE TABLE test.rabbitmq (key UInt64, value UInt64) + DROP TABLE IF EXISTS test.consumer3; + CREATE TABLE test.rabbitmq (key UInt64, value UInt64, value2 ALIAS value + 1, value3 MATERIALIZED value + 1, value4 DEFAULT 1) ENGINE = RabbitMQ SETTINGS rabbitmq_host_port = 'rabbitmq1:5672', rabbitmq_exchange_name = 'mmv', @@ -497,13 +505,18 @@ def test_rabbitmq_many_materialized_views(rabbitmq_cluster): CREATE TABLE test.view1 (key UInt64, value UInt64) ENGINE = MergeTree() ORDER BY key; - CREATE TABLE test.view2 (key UInt64, value UInt64) + CREATE TABLE test.view2 (key UInt64, value UInt64, value2 UInt64, value3 UInt64, value4 UInt64) + ENGINE = MergeTree() + ORDER BY key; + CREATE TABLE test.view3 (key UInt64) ENGINE = MergeTree() ORDER BY key; CREATE MATERIALIZED VIEW test.consumer1 TO test.view1 AS SELECT * FROM test.rabbitmq; CREATE MATERIALIZED VIEW test.consumer2 TO test.view2 AS SELECT * FROM test.rabbitmq; + CREATE MATERIALIZED VIEW test.consumer3 TO test.view3 AS + SELECT * FROM test.rabbitmq; """ ) @@ -514,7 +527,7 @@ def test_rabbitmq_many_materialized_views(rabbitmq_cluster): connection = pika.BlockingConnection(parameters) channel = connection.channel() - instance.wait_for_log_line("Started streaming to 2 attached views") + instance.wait_for_log_line("Started streaming to 3 attached views") messages = [] for i in range(50): @@ -522,24 +535,43 @@ def test_rabbitmq_many_materialized_views(rabbitmq_cluster): for message in messages: channel.basic_publish(exchange="mmv", routing_key="", body=message) - while True: + is_check_passed = False + deadline = time.monotonic() + 60 + while time.monotonic() < deadline: result1 = instance.query("SELECT * FROM test.view1 ORDER BY key") result2 = instance.query("SELECT * FROM test.view2 ORDER BY key") - if rabbitmq_check_result(result1) and rabbitmq_check_result(result2): + result3 = instance.query("SELECT * FROM test.view3 ORDER BY key") + # Note that for view2 result is `i i 0 0 0`, but not `i i i+1 i+1 1` as expected, ALIAS/MATERIALIZED/DEFAULT columns are not supported in RabbitMQ engine + # We onlt check that at least it do not fail + if ( + rabbitmq_check_result(result1) + and rabbitmq_check_result( + result2, reference="\n".join([f"{i}\t{i}\t0\t0\t0" for i in range(50)]) + ) + and rabbitmq_check_result( + result3, reference="\n".join([str(i) for i in range(50)]) + ) + ): + is_check_passed = True break + time.sleep(0.1) + + assert ( + is_check_passed + ), f"References are not equal to results, result1: {result1}, result2: {result2}, result3: {result3}" instance.query( """ DROP TABLE test.consumer1; DROP TABLE test.consumer2; + DROP TABLE test.consumer3; DROP TABLE test.view1; DROP TABLE test.view2; + DROP TABLE test.view3; """ ) connection.close() - rabbitmq_check_result(result1, True) - rabbitmq_check_result(result2, True) def test_rabbitmq_big_message(rabbitmq_cluster): diff --git a/tests/integration/test_storage_rabbitmq/test_rabbitmq_json.reference b/tests/integration/test_storage_rabbitmq/test_rabbitmq_json.reference deleted file mode 100644 index 959bb2aad74..00000000000 --- a/tests/integration/test_storage_rabbitmq/test_rabbitmq_json.reference +++ /dev/null @@ -1,50 +0,0 @@ -0 0 -1 1 -2 2 -3 3 -4 4 -5 5 -6 6 -7 7 -8 8 -9 9 -10 10 -11 11 -12 12 -13 13 -14 14 -15 15 -16 16 -17 17 -18 18 -19 19 -20 20 -21 21 -22 22 -23 23 -24 24 -25 25 -26 26 -27 27 -28 28 -29 29 -30 30 -31 31 -32 32 -33 33 -34 34 -35 35 -36 36 -37 37 -38 38 -39 39 -40 40 -41 41 -42 42 -43 43 -44 44 -45 45 -46 46 -47 47 -48 48 -49 49