Merge branch 'master' into new-nav

This commit is contained in:
Rich Raposa 2023-03-07 14:06:37 -07:00 committed by GitHub
commit 67c16195cc
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 95 additions and 5 deletions

View File

@ -125,6 +125,10 @@ Groups are flexible and synced on the cluster. For instance, if you have 10 topi
2. Create a table with the desired structure.
3. Create a materialized view that converts data from the engine and puts it into a previously created table.
:::info
Kafka Engine doesn't support columns with default value of type `DEFAULT/MATERIALIZED/EPHEMERAL/ALIAS`. If you need columns with any default type, they can be added at `MATERIALIZED VIEW` level.
:::
When the `MATERIALIZED VIEW` joins the engine, it starts collecting data in the background. This allows you to continually receive messages from Kafka and convert them to the required format using `SELECT`.
One kafka table can have as many materialized views as you like, they do not read data from the kafka table directly, but receive new records (in blocks), this way you can write to several tables with different detail level (with grouping - aggregation and without).

View File

@ -124,6 +124,10 @@ If the data type and default expression are defined explicitly, this expression
Default expressions may be defined as an arbitrary expression from table constants and columns. When creating and changing the table structure, it checks that expressions do not contain loops. For INSERT, it checks that expressions are resolvable that all columns they can be calculated from have been passed.
:::info
Kafka Engine doesn't support columns with default value of type `DEFAULT/MATERIALIZED/EPHEMERAL/ALIAS`. If you need columns with any default type, they can be added at `MATERIALIZED VIEW` level, see [Kafka Engine](../../../engines/table-engines/integrations/kafka.md#description).
:::
### DEFAULT
`DEFAULT expr`

View File

@ -161,6 +161,8 @@ public:
if (curr_process.processed)
continue;
LOG_DEBUG(&Poco::Logger::get("KillQuery"), "Will kill query {} (synchronously)", curr_process.query_id);
auto code = process_list.sendCancelToQuery(curr_process.query_id, curr_process.user, true);
if (code != CancellationCode::QueryIsNotInitializedYet && code != CancellationCode::CancelSent)
@ -226,6 +228,8 @@ BlockIO InterpreterKillQueryQuery::execute()
MutableColumns res_columns = header.cloneEmptyColumns();
for (const auto & query_desc : queries_to_stop)
{
if (!query.test)
LOG_DEBUG(&Poco::Logger::get("KillQuery"), "Will kill query {} (asynchronously)", query_desc.query_id);
auto code = (query.test) ? CancellationCode::Unknown : process_list.sendCancelToQuery(query_desc.query_id, query_desc.user, true);
insertResultRow(query_desc.source_num, code, processes_block, header, res_columns);
}

View File

@ -187,7 +187,7 @@ void PushingAsyncPipelineExecutor::push(Chunk chunk)
if (!is_pushed)
throw Exception(ErrorCodes::LOGICAL_ERROR,
"Pipeline for PushingPipelineExecutor was finished before all data was inserted");
"Pipeline for PushingAsyncPipelineExecutor was finished before all data was inserted");
}
void PushingAsyncPipelineExecutor::push(Block block)

View File

@ -383,6 +383,15 @@ NamesAndTypesList ColumnsDescription::getEphemeral() const
return ret;
}
NamesAndTypesList ColumnsDescription::getWithDefaultExpression() const
{
NamesAndTypesList ret;
for (const auto & col : columns)
if (col.default_desc.expression)
ret.emplace_back(col.name, col.type);
return ret;
}
NamesAndTypesList ColumnsDescription::getAll() const
{
NamesAndTypesList ret;

View File

@ -132,6 +132,9 @@ public:
NamesAndTypesList getInsertable() const; /// ordinary + ephemeral
NamesAndTypesList getAliases() const;
NamesAndTypesList getEphemeral() const;
// Columns with preset default expression.
// For example from `CREATE TABLE` statement
NamesAndTypesList getWithDefaultExpression() const;
NamesAndTypesList getAllPhysical() const; /// ordinary + materialized.
NamesAndTypesList getAll() const; /// ordinary + materialized + aliases + ephemeral
/// Returns .size0/.null/...

View File

@ -959,6 +959,11 @@ void registerStorageKafka(StorageFactory & factory)
{
throw Exception(ErrorCodes::BAD_ARGUMENTS, "kafka_poll_max_batch_size can not be lower than 1");
}
if (args.columns.getOrdinary() != args.columns.getAll() || !args.columns.getWithDefaultExpression().empty())
{
throw Exception(ErrorCodes::BAD_ARGUMENTS, "KafkaEngine doesn't support DEFAULT/MATERIALIZED/EPHEMERAL/ALIAS expressions for columns. "
"See https://clickhouse.com/docs/en/engines/table-engines/integrations/kafka/#configuration");
}
return std::make_shared<StorageKafka>(args.table_id, args.getContext(), args.columns, std::move(kafka_settings), collection_name);
};

View File

@ -8434,7 +8434,11 @@ std::pair<bool, NameSet> StorageReplicatedMergeTree::unlockSharedDataByID(
}
else if (error_code == Coordination::Error::ZNONODE)
{
LOG_TRACE(logger, "Node with parent zookeeper lock {} for part {} doesn't exist (part was unlocked before)", zookeeper_part_uniq_node, part_name);
/// We don't know what to do, because this part can be mutation part
/// with hardlinked columns. Since we don't have this information (about blobs not to remove)
/// we refuse to remove blobs.
LOG_WARNING(logger, "Node with parent zookeeper lock {} for part {} doesn't exist (part was unlocked before), refuse to remove blobs", zookeeper_part_uniq_node, part_name);
return {false, {}};
}
else
{

View File

@ -594,8 +594,6 @@ def test_cancel_while_processing_input():
stub = clickhouse_grpc_pb2_grpc.ClickHouseStub(main_channel)
result = stub.ExecuteQueryWithStreamInput(send_query_info())
assert result.cancelled == True
assert result.progress.written_rows == 6
assert query("SELECT a FROM t ORDER BY a") == "1\n2\n3\n4\n5\n6\n"
def test_cancel_while_generating_output():

View File

@ -285,6 +285,56 @@ def avro_confluent_message(schema_registry_client, value):
# Tests
def test_kafka_prohibited_column_types(kafka_cluster):
def assert_returned_exception(e):
assert e.value.returncode == 36
assert (
"KafkaEngine doesn't support DEFAULT/MATERIALIZED/EPHEMERAL/ALIAS expressions for columns."
in str(e.value)
)
# check column with DEFAULT expression
with pytest.raises(QueryRuntimeException) as exception:
instance.query(
"""
CREATE TABLE test.kafka (a Int, b Int DEFAULT 0)
ENGINE = Kafka('{kafka_broker}:19092', '{kafka_topic_new}', '{kafka_group_name_new}', '{kafka_format_json_each_row}', '\\n')
"""
)
assert_returned_exception(exception)
# check EPHEMERAL
with pytest.raises(QueryRuntimeException) as exception:
instance.query(
"""
CREATE TABLE test.kafka (a Int, b Int EPHEMERAL)
ENGINE = Kafka('{kafka_broker}:19092', '{kafka_topic_new}', '{kafka_group_name_new}', '{kafka_format_json_each_row}', '\\n')
"""
)
assert_returned_exception(exception)
# check ALIAS
with pytest.raises(QueryRuntimeException) as exception:
instance.query(
"""
CREATE TABLE test.kafka (a Int, b String Alias toString(a))
ENGINE = Kafka('{kafka_broker}:19092', '{kafka_topic_new}', '{kafka_group_name_new}', '{kafka_format_json_each_row}', '\\n')
"""
)
assert_returned_exception(exception)
# check MATERIALIZED
# check ALIAS
with pytest.raises(QueryRuntimeException) as exception:
instance.query(
"""
CREATE TABLE test.kafka (a Int, b String MATERIALIZED toString(a))
ENGINE = Kafka('{kafka_broker}:19092', '{kafka_topic_new}', '{kafka_group_name_new}', '{kafka_format_json_each_row}', '\\n')
"""
)
assert_returned_exception(exception)
def test_kafka_settings_old_syntax(kafka_cluster):
assert TSV(
instance.query(

View File

@ -49,7 +49,16 @@ insert_client_opts=(
timeout 250s $CLICKHOUSE_CLIENT "${client_opts[@]}" "${insert_client_opts[@]}" -q "insert into function remote('127.2', currentDatabase(), in_02232) select * from numbers(1e6)"
# Kill underlying query of remote() to make KILL faster
timeout 60s $CLICKHOUSE_CLIENT "${client_opts[@]}" -q "KILL QUERY WHERE Settings['log_comment'] = '$CLICKHOUSE_LOG_COMMENT' SYNC" --format Null
# This test is reproducing very interesting bahaviour.
# The block size is 1, so the secondary query creates InterpreterSelectQuery for each row due to pushing to the MV.
# It works extremely slow, and the initial query produces new blocks and writes them to the socket much faster
# then the secondary query can read and process them. Therefore, it fills network buffers in the kernel.
# Once a buffer in the kernel is full, send(...) blocks until the secondary query will finish processing data
# that it already has in ReadBufferFromPocoSocket and call recv.
# Or until the kernel will decide to resize the buffer (seems like it has non-trivial rules for that).
# Anyway, it may look like the initial query got stuck, but actually it did not.
# Moreover, the initial query cannot be killed at that point, so KILL QUERY ... SYNC will get "stuck" as well.
timeout 30s $CLICKHOUSE_CLIENT "${client_opts[@]}" -q "KILL QUERY WHERE query like '%INSERT INTO $CLICKHOUSE_DATABASE.in_02232%' SYNC" --format Null
echo $?
$CLICKHOUSE_CLIENT "${client_opts[@]}" -nm -q "