From 969cb2f826757df38dbccbaa4233e6a28a772bca Mon Sep 17 00:00:00 2001 From: Ivan Lezhankin Date: Mon, 22 Jul 2019 14:32:11 +0300 Subject: [PATCH 1/9] Append _partition virtual column --- dbms/src/Storages/Kafka/KafkaBlockInputStream.cpp | 13 +++++++------ .../Storages/Kafka/ReadBufferFromKafkaConsumer.h | 1 + dbms/src/Storages/Kafka/StorageKafka.cpp | 3 ++- dbms/tests/integration/test_storage_kafka/test.py | 8 ++++---- 4 files changed, 14 insertions(+), 11 deletions(-) diff --git a/dbms/src/Storages/Kafka/KafkaBlockInputStream.cpp b/dbms/src/Storages/Kafka/KafkaBlockInputStream.cpp index 5b8d80cb062..3ddb54f848b 100644 --- a/dbms/src/Storages/Kafka/KafkaBlockInputStream.cpp +++ b/dbms/src/Storages/Kafka/KafkaBlockInputStream.cpp @@ -19,7 +19,7 @@ KafkaBlockInputStream::KafkaBlockInputStream( if (!storage.getSchemaName().empty()) context.setSetting("format_schema", storage.getSchemaName()); - virtual_columns = storage.getSampleBlockForColumns({"_topic", "_key", "_offset"}).cloneEmptyColumns(); + virtual_columns = storage.getSampleBlockForColumns({"_topic", "_key", "_offset", "_partition"}).cloneEmptyColumns(); } KafkaBlockInputStream::~KafkaBlockInputStream() @@ -57,9 +57,10 @@ void KafkaBlockInputStream::readPrefixImpl() auto read_callback = [this] { const auto * sub_buffer = buffer->subBufferAs(); - virtual_columns[0]->insert(sub_buffer->currentTopic()); // "topic" - virtual_columns[1]->insert(sub_buffer->currentKey()); // "key" - virtual_columns[2]->insert(sub_buffer->currentOffset()); // "offset" + virtual_columns[0]->insert(sub_buffer->currentTopic()); // "topic" + virtual_columns[1]->insert(sub_buffer->currentKey()); // "key" + virtual_columns[2]->insert(sub_buffer->currentOffset()); // "offset" + virtual_columns[3]->insert(sub_buffer->currentPartition()); // "partition" }; auto child = FormatFactory::instance().getInput( @@ -76,8 +77,8 @@ Block KafkaBlockInputStream::readImpl() if (!block) return block; - Block virtual_block = storage.getSampleBlockForColumns({"_topic", "_key", "_offset"}).cloneWithColumns(std::move(virtual_columns)); - virtual_columns = storage.getSampleBlockForColumns({"_topic", "_key", "_offset"}).cloneEmptyColumns(); + Block virtual_block = storage.getSampleBlockForColumns({"_topic", "_key", "_offset", "_partition"}).cloneWithColumns(std::move(virtual_columns)); + virtual_columns = storage.getSampleBlockForColumns({"_topic", "_key", "_offset", "_partition"}).cloneEmptyColumns(); for (const auto & column : virtual_block.getColumnsWithTypeAndName()) block.insert(column); diff --git a/dbms/src/Storages/Kafka/ReadBufferFromKafkaConsumer.h b/dbms/src/Storages/Kafka/ReadBufferFromKafkaConsumer.h index ac6011cfed0..0de18ba59bf 100644 --- a/dbms/src/Storages/Kafka/ReadBufferFromKafkaConsumer.h +++ b/dbms/src/Storages/Kafka/ReadBufferFromKafkaConsumer.h @@ -30,6 +30,7 @@ public: String currentTopic() const { return current[-1].get_topic(); } String currentKey() const { return current[-1].get_key(); } auto currentOffset() const { return current[-1].get_offset(); } + auto currentPartition() const { return current[-1].get_partition(); } private: using Messages = std::vector; diff --git a/dbms/src/Storages/Kafka/StorageKafka.cpp b/dbms/src/Storages/Kafka/StorageKafka.cpp index 20599c7e4f8..e591649dd6a 100644 --- a/dbms/src/Storages/Kafka/StorageKafka.cpp +++ b/dbms/src/Storages/Kafka/StorageKafka.cpp @@ -85,7 +85,8 @@ StorageKafka::StorageKafka( columns_, ColumnsDescription({{"_topic", std::make_shared()}, {"_key", std::make_shared()}, - {"_offset", std::make_shared()}}, true)) + {"_offset", std::make_shared()}, + {"_partition", std::make_shared()}}, true)) , table_name(table_name_) , database_name(database_name_) , global_context(context_) diff --git a/dbms/tests/integration/test_storage_kafka/test.py b/dbms/tests/integration/test_storage_kafka/test.py index 3f38b068a22..915665ba998 100644 --- a/dbms/tests/integration/test_storage_kafka/test.py +++ b/dbms/tests/integration/test_storage_kafka/test.py @@ -380,7 +380,7 @@ def test_kafka_virtual_columns(kafka_cluster): result = '' while True: time.sleep(1) - result += instance.query('SELECT _key, key, _topic, value, _offset FROM test.kafka') + result += instance.query('SELECT _key, key, _topic, value, _offset, _partition FROM test.kafka') if kafka_check_result(result, False, 'test_kafka_virtual1.reference'): break kafka_check_result(result, True, 'test_kafka_virtual1.reference') @@ -397,11 +397,11 @@ def test_kafka_virtual_columns_with_materialized_view(kafka_cluster): kafka_group_name = 'virt2', kafka_format = 'JSONEachRow', kafka_row_delimiter = '\\n'; - CREATE TABLE test.view (key UInt64, value UInt64, kafka_key String, topic String, offset UInt64) + CREATE TABLE test.view (key UInt64, value UInt64, kafka_key String, topic String, offset UInt64, partition UInt64) ENGINE = MergeTree() ORDER BY key; CREATE MATERIALIZED VIEW test.consumer TO test.view AS - SELECT *, _key as kafka_key, _topic as topic, _offset as offset FROM test.kafka; + SELECT *, _key as kafka_key, _topic as topic, _offset as offset, _partition as partition FROM test.kafka; ''') messages = [] @@ -411,7 +411,7 @@ def test_kafka_virtual_columns_with_materialized_view(kafka_cluster): while True: time.sleep(1) - result = instance.query('SELECT kafka_key, key, topic, value, offset FROM test.view') + result = instance.query('SELECT kafka_key, key, topic, value, offset, partition FROM test.view') if kafka_check_result(result, False, 'test_kafka_virtual2.reference'): break kafka_check_result(result, True, 'test_kafka_virtual2.reference') From b55da1888a310479f2b5cb5ba29f85364e0d0d72 Mon Sep 17 00:00:00 2001 From: Ivan Lezhankin Date: Tue, 6 Aug 2019 17:18:37 +0300 Subject: [PATCH 2/9] Add "_timestamp" virtual column --- .../Storages/Kafka/KafkaBlockInputStream.cpp | 10 +++++++--- .../Kafka/ReadBufferFromKafkaConsumer.h | 1 + dbms/src/Storages/Kafka/StorageKafka.cpp | 5 ++++- .../integration/test_storage_kafka/test.py | 18 +++++++++--------- 4 files changed, 21 insertions(+), 13 deletions(-) diff --git a/dbms/src/Storages/Kafka/KafkaBlockInputStream.cpp b/dbms/src/Storages/Kafka/KafkaBlockInputStream.cpp index 8607d1e1e81..aeecc772dce 100644 --- a/dbms/src/Storages/Kafka/KafkaBlockInputStream.cpp +++ b/dbms/src/Storages/Kafka/KafkaBlockInputStream.cpp @@ -19,7 +19,7 @@ KafkaBlockInputStream::KafkaBlockInputStream( if (!storage.getSchemaName().empty()) context.setSetting("format_schema", storage.getSchemaName()); - virtual_columns = storage.getSampleBlockForColumns({"_topic", "_key", "_offset", "_partition"}).cloneEmptyColumns(); + virtual_columns = storage.getSampleBlockForColumns({"_topic", "_key", "_offset", "_partition", "_timestamp"}).cloneEmptyColumns(); } KafkaBlockInputStream::~KafkaBlockInputStream() @@ -64,6 +64,10 @@ void KafkaBlockInputStream::readPrefixImpl() virtual_columns[1]->insert(sub_buffer->currentKey()); // "key" virtual_columns[2]->insert(sub_buffer->currentOffset()); // "offset" virtual_columns[3]->insert(sub_buffer->currentPartition()); // "partition" + + auto timestamp = sub_buffer->currentTimestamp(); + if (timestamp) + virtual_columns[4]->insert(std::chrono::duration_cast(timestamp->get_timestamp()).count()); // "timestamp" }; auto child = FormatFactory::instance().getInput( @@ -80,8 +84,8 @@ Block KafkaBlockInputStream::readImpl() if (!block) return block; - Block virtual_block = storage.getSampleBlockForColumns({"_topic", "_key", "_offset", "_partition"}).cloneWithColumns(std::move(virtual_columns)); - virtual_columns = storage.getSampleBlockForColumns({"_topic", "_key", "_offset", "_partition"}).cloneEmptyColumns(); + Block virtual_block = storage.getSampleBlockForColumns({"_topic", "_key", "_offset", "_partition", "_timestamp"}).cloneWithColumns(std::move(virtual_columns)); + virtual_columns = storage.getSampleBlockForColumns({"_topic", "_key", "_offset", "_partition", "_timestamp"}).cloneEmptyColumns(); for (const auto & column : virtual_block.getColumnsWithTypeAndName()) block.insert(column); diff --git a/dbms/src/Storages/Kafka/ReadBufferFromKafkaConsumer.h b/dbms/src/Storages/Kafka/ReadBufferFromKafkaConsumer.h index d1ea51b896e..a8295152d91 100644 --- a/dbms/src/Storages/Kafka/ReadBufferFromKafkaConsumer.h +++ b/dbms/src/Storages/Kafka/ReadBufferFromKafkaConsumer.h @@ -31,6 +31,7 @@ public: String currentKey() const { return current[-1].get_key(); } auto currentOffset() const { return current[-1].get_offset(); } auto currentPartition() const { return current[-1].get_partition(); } + auto currentTimestamp() const { return current[-1].get_timestamp(); } private: using Messages = std::vector; diff --git a/dbms/src/Storages/Kafka/StorageKafka.cpp b/dbms/src/Storages/Kafka/StorageKafka.cpp index e591649dd6a..7cfcb4caf10 100644 --- a/dbms/src/Storages/Kafka/StorageKafka.cpp +++ b/dbms/src/Storages/Kafka/StorageKafka.cpp @@ -4,6 +4,8 @@ #include #include #include +#include +#include #include #include #include @@ -86,7 +88,8 @@ StorageKafka::StorageKafka( ColumnsDescription({{"_topic", std::make_shared()}, {"_key", std::make_shared()}, {"_offset", std::make_shared()}, - {"_partition", std::make_shared()}}, true)) + {"_partition", std::make_shared()}, + {"_timestamp", std::make_shared(std::make_shared())}}, true)) , table_name(table_name_) , database_name(database_name_) , global_context(context_) diff --git a/dbms/tests/integration/test_storage_kafka/test.py b/dbms/tests/integration/test_storage_kafka/test.py index 8ab046b894b..7c7a30c9fa8 100644 --- a/dbms/tests/integration/test_storage_kafka/test.py +++ b/dbms/tests/integration/test_storage_kafka/test.py @@ -61,10 +61,10 @@ def wait_kafka_is_available(max_retries=50): time.sleep(1) -def kafka_produce(topic, messages): +def kafka_produce(topic, messages, timestamp=None): producer = KafkaProducer(bootstrap_servers="localhost:9092") for message in messages: - producer.send(topic=topic, value=message) + producer.send(topic=topic, value=message, timestamp_ms=timestamp) producer.flush() print ("Produced {} messages for topic {}".format(len(messages), topic)) @@ -376,16 +376,16 @@ def test_kafka_virtual_columns(kafka_cluster): messages = '' for i in range(25): messages += json.dumps({'key': i, 'value': i}) + '\n' - kafka_produce('virt1', [messages]) + kafka_produce('virt1', [messages], 0) messages = '' for i in range(25, 50): messages += json.dumps({'key': i, 'value': i}) + '\n' - kafka_produce('virt1', [messages]) + kafka_produce('virt1', [messages], 0) result = '' for i in range(50): - result += instance.query('SELECT _key, key, _topic, value, _offset, _partition FROM test.kafka') + result += instance.query('SELECT _key, key, _topic, value, _offset, _partition, _timestamp FROM test.kafka') if kafka_check_result(result, False, 'test_kafka_virtual1.reference'): break time.sleep(0.5) @@ -403,20 +403,20 @@ def test_kafka_virtual_columns_with_materialized_view(kafka_cluster): kafka_group_name = 'virt2', kafka_format = 'JSONEachRow', kafka_row_delimiter = '\\n'; - CREATE TABLE test.view (key UInt64, value UInt64, kafka_key String, topic String, offset UInt64, partition UInt64) + CREATE TABLE test.view (key UInt64, value UInt64, kafka_key String, topic String, offset UInt64, partition UInt64, timestamp Nullable(DateTime)) ENGINE = MergeTree() ORDER BY key; CREATE MATERIALIZED VIEW test.consumer TO test.view AS - SELECT *, _key as kafka_key, _topic as topic, _offset as offset, _partition as partition FROM test.kafka; + SELECT *, _key as kafka_key, _topic as topic, _offset as offset, _partition as partition, _timestamp as timestamp FROM test.kafka; ''') messages = [] for i in range(50): messages.append(json.dumps({'key': i, 'value': i})) - kafka_produce('virt2', messages) + kafka_produce('virt2', messages, 0) for i in range(50): - result = instance.query('SELECT kafka_key, key, topic, value, offset, partition FROM test.view') + result = instance.query('SELECT kafka_key, key, topic, value, offset, partition, timestamp FROM test.view') if kafka_check_result(result, False, 'test_kafka_virtual2.reference'): break time.sleep(0.5) From 0bc6847111aee4ca7175fd19bdbf504bf18427fa Mon Sep 17 00:00:00 2001 From: Ivan Lezhankin Date: Wed, 7 Aug 2019 12:54:29 +0300 Subject: [PATCH 3/9] Update references --- .../test_kafka_virtual1.reference | 100 +++++++++--------- .../test_kafka_virtual2.reference | 100 +++++++++--------- 2 files changed, 100 insertions(+), 100 deletions(-) diff --git a/dbms/tests/integration/test_storage_kafka/test_kafka_virtual1.reference b/dbms/tests/integration/test_storage_kafka/test_kafka_virtual1.reference index 5956210d25e..30c2f6e1c59 100644 --- a/dbms/tests/integration/test_storage_kafka/test_kafka_virtual1.reference +++ b/dbms/tests/integration/test_storage_kafka/test_kafka_virtual1.reference @@ -1,50 +1,50 @@ - 0 virt1 0 0 - 1 virt1 1 0 - 2 virt1 2 0 - 3 virt1 3 0 - 4 virt1 4 0 - 5 virt1 5 0 - 6 virt1 6 0 - 7 virt1 7 0 - 8 virt1 8 0 - 9 virt1 9 0 - 10 virt1 10 0 - 11 virt1 11 0 - 12 virt1 12 0 - 13 virt1 13 0 - 14 virt1 14 0 - 15 virt1 15 0 - 16 virt1 16 0 - 17 virt1 17 0 - 18 virt1 18 0 - 19 virt1 19 0 - 20 virt1 20 0 - 21 virt1 21 0 - 22 virt1 22 0 - 23 virt1 23 0 - 24 virt1 24 0 - 25 virt1 25 1 - 26 virt1 26 1 - 27 virt1 27 1 - 28 virt1 28 1 - 29 virt1 29 1 - 30 virt1 30 1 - 31 virt1 31 1 - 32 virt1 32 1 - 33 virt1 33 1 - 34 virt1 34 1 - 35 virt1 35 1 - 36 virt1 36 1 - 37 virt1 37 1 - 38 virt1 38 1 - 39 virt1 39 1 - 40 virt1 40 1 - 41 virt1 41 1 - 42 virt1 42 1 - 43 virt1 43 1 - 44 virt1 44 1 - 45 virt1 45 1 - 46 virt1 46 1 - 47 virt1 47 1 - 48 virt1 48 1 - 49 virt1 49 1 + 0 virt1 0 0 0 0000-00-00 00:00:00 + 1 virt1 1 0 0 0000-00-00 00:00:00 + 2 virt1 2 0 0 0000-00-00 00:00:00 + 3 virt1 3 0 0 0000-00-00 00:00:00 + 4 virt1 4 0 0 0000-00-00 00:00:00 + 5 virt1 5 0 0 0000-00-00 00:00:00 + 6 virt1 6 0 0 0000-00-00 00:00:00 + 7 virt1 7 0 0 0000-00-00 00:00:00 + 8 virt1 8 0 0 0000-00-00 00:00:00 + 9 virt1 9 0 0 0000-00-00 00:00:00 + 10 virt1 10 0 0 0000-00-00 00:00:00 + 11 virt1 11 0 0 0000-00-00 00:00:00 + 12 virt1 12 0 0 0000-00-00 00:00:00 + 13 virt1 13 0 0 0000-00-00 00:00:00 + 14 virt1 14 0 0 0000-00-00 00:00:00 + 15 virt1 15 0 0 0000-00-00 00:00:00 + 16 virt1 16 0 0 0000-00-00 00:00:00 + 17 virt1 17 0 0 0000-00-00 00:00:00 + 18 virt1 18 0 0 0000-00-00 00:00:00 + 19 virt1 19 0 0 0000-00-00 00:00:00 + 20 virt1 20 0 0 0000-00-00 00:00:00 + 21 virt1 21 0 0 0000-00-00 00:00:00 + 22 virt1 22 0 0 0000-00-00 00:00:00 + 23 virt1 23 0 0 0000-00-00 00:00:00 + 24 virt1 24 0 0 0000-00-00 00:00:00 + 25 virt1 25 1 0 0000-00-00 00:00:00 + 26 virt1 26 1 0 0000-00-00 00:00:00 + 27 virt1 27 1 0 0000-00-00 00:00:00 + 28 virt1 28 1 0 0000-00-00 00:00:00 + 29 virt1 29 1 0 0000-00-00 00:00:00 + 30 virt1 30 1 0 0000-00-00 00:00:00 + 31 virt1 31 1 0 0000-00-00 00:00:00 + 32 virt1 32 1 0 0000-00-00 00:00:00 + 33 virt1 33 1 0 0000-00-00 00:00:00 + 34 virt1 34 1 0 0000-00-00 00:00:00 + 35 virt1 35 1 0 0000-00-00 00:00:00 + 36 virt1 36 1 0 0000-00-00 00:00:00 + 37 virt1 37 1 0 0000-00-00 00:00:00 + 38 virt1 38 1 0 0000-00-00 00:00:00 + 39 virt1 39 1 0 0000-00-00 00:00:00 + 40 virt1 40 1 0 0000-00-00 00:00:00 + 41 virt1 41 1 0 0000-00-00 00:00:00 + 42 virt1 42 1 0 0000-00-00 00:00:00 + 43 virt1 43 1 0 0000-00-00 00:00:00 + 44 virt1 44 1 0 0000-00-00 00:00:00 + 45 virt1 45 1 0 0000-00-00 00:00:00 + 46 virt1 46 1 0 0000-00-00 00:00:00 + 47 virt1 47 1 0 0000-00-00 00:00:00 + 48 virt1 48 1 0 0000-00-00 00:00:00 + 49 virt1 49 1 0 0000-00-00 00:00:00 diff --git a/dbms/tests/integration/test_storage_kafka/test_kafka_virtual2.reference b/dbms/tests/integration/test_storage_kafka/test_kafka_virtual2.reference index 50c2edbf802..afb9a64f4fc 100644 --- a/dbms/tests/integration/test_storage_kafka/test_kafka_virtual2.reference +++ b/dbms/tests/integration/test_storage_kafka/test_kafka_virtual2.reference @@ -1,50 +1,50 @@ - 0 virt2 0 0 - 1 virt2 1 1 - 2 virt2 2 2 - 3 virt2 3 3 - 4 virt2 4 4 - 5 virt2 5 5 - 6 virt2 6 6 - 7 virt2 7 7 - 8 virt2 8 8 - 9 virt2 9 9 - 10 virt2 10 10 - 11 virt2 11 11 - 12 virt2 12 12 - 13 virt2 13 13 - 14 virt2 14 14 - 15 virt2 15 15 - 16 virt2 16 16 - 17 virt2 17 17 - 18 virt2 18 18 - 19 virt2 19 19 - 20 virt2 20 20 - 21 virt2 21 21 - 22 virt2 22 22 - 23 virt2 23 23 - 24 virt2 24 24 - 25 virt2 25 25 - 26 virt2 26 26 - 27 virt2 27 27 - 28 virt2 28 28 - 29 virt2 29 29 - 30 virt2 30 30 - 31 virt2 31 31 - 32 virt2 32 32 - 33 virt2 33 33 - 34 virt2 34 34 - 35 virt2 35 35 - 36 virt2 36 36 - 37 virt2 37 37 - 38 virt2 38 38 - 39 virt2 39 39 - 40 virt2 40 40 - 41 virt2 41 41 - 42 virt2 42 42 - 43 virt2 43 43 - 44 virt2 44 44 - 45 virt2 45 45 - 46 virt2 46 46 - 47 virt2 47 47 - 48 virt2 48 48 - 49 virt2 49 49 + 0 virt2 0 0 0 0000-00-00 00:00:00 + 1 virt2 1 1 0 0000-00-00 00:00:00 + 2 virt2 2 2 0 0000-00-00 00:00:00 + 3 virt2 3 3 0 0000-00-00 00:00:00 + 4 virt2 4 4 0 0000-00-00 00:00:00 + 5 virt2 5 5 0 0000-00-00 00:00:00 + 6 virt2 6 6 0 0000-00-00 00:00:00 + 7 virt2 7 7 0 0000-00-00 00:00:00 + 8 virt2 8 8 0 0000-00-00 00:00:00 + 9 virt2 9 9 0 0000-00-00 00:00:00 + 10 virt2 10 10 0 0000-00-00 00:00:00 + 11 virt2 11 11 0 0000-00-00 00:00:00 + 12 virt2 12 12 0 0000-00-00 00:00:00 + 13 virt2 13 13 0 0000-00-00 00:00:00 + 14 virt2 14 14 0 0000-00-00 00:00:00 + 15 virt2 15 15 0 0000-00-00 00:00:00 + 16 virt2 16 16 0 0000-00-00 00:00:00 + 17 virt2 17 17 0 0000-00-00 00:00:00 + 18 virt2 18 18 0 0000-00-00 00:00:00 + 19 virt2 19 19 0 0000-00-00 00:00:00 + 20 virt2 20 20 0 0000-00-00 00:00:00 + 21 virt2 21 21 0 0000-00-00 00:00:00 + 22 virt2 22 22 0 0000-00-00 00:00:00 + 23 virt2 23 23 0 0000-00-00 00:00:00 + 24 virt2 24 24 0 0000-00-00 00:00:00 + 25 virt2 25 25 0 0000-00-00 00:00:00 + 26 virt2 26 26 0 0000-00-00 00:00:00 + 27 virt2 27 27 0 0000-00-00 00:00:00 + 28 virt2 28 28 0 0000-00-00 00:00:00 + 29 virt2 29 29 0 0000-00-00 00:00:00 + 30 virt2 30 30 0 0000-00-00 00:00:00 + 31 virt2 31 31 0 0000-00-00 00:00:00 + 32 virt2 32 32 0 0000-00-00 00:00:00 + 33 virt2 33 33 0 0000-00-00 00:00:00 + 34 virt2 34 34 0 0000-00-00 00:00:00 + 35 virt2 35 35 0 0000-00-00 00:00:00 + 36 virt2 36 36 0 0000-00-00 00:00:00 + 37 virt2 37 37 0 0000-00-00 00:00:00 + 38 virt2 38 38 0 0000-00-00 00:00:00 + 39 virt2 39 39 0 0000-00-00 00:00:00 + 40 virt2 40 40 0 0000-00-00 00:00:00 + 41 virt2 41 41 0 0000-00-00 00:00:00 + 42 virt2 42 42 0 0000-00-00 00:00:00 + 43 virt2 43 43 0 0000-00-00 00:00:00 + 44 virt2 44 44 0 0000-00-00 00:00:00 + 45 virt2 45 45 0 0000-00-00 00:00:00 + 46 virt2 46 46 0 0000-00-00 00:00:00 + 47 virt2 47 47 0 0000-00-00 00:00:00 + 48 virt2 48 48 0 0000-00-00 00:00:00 + 49 virt2 49 49 0 0000-00-00 00:00:00 From 3c227dce13e58bade4ebd191bf16ef830c5394fe Mon Sep 17 00:00:00 2001 From: CurtizJ Date: Wed, 7 Aug 2019 13:15:25 +0300 Subject: [PATCH 4/9] fix topK and topKWeighted functions --- dbms/src/Common/SpaceSaving.h | 79 ++++++++++++++++++++++------------- 1 file changed, 50 insertions(+), 29 deletions(-) diff --git a/dbms/src/Common/SpaceSaving.h b/dbms/src/Common/SpaceSaving.h index 56dbf382293..e5173027fa5 100644 --- a/dbms/src/Common/SpaceSaving.h +++ b/dbms/src/Common/SpaceSaving.h @@ -113,7 +113,8 @@ public: } TKey key; - size_t slot, hash; + ssize_t slot; + size_t hash; UInt64 count; UInt64 error; }; @@ -147,15 +148,13 @@ public: void insert(const TKey & key, UInt64 increment = 1, UInt64 error = 0) { // Increase weight of a key that already exists - // It uses hashtable for both value mapping as a presence test (c_i != 0) auto hash = counter_map.hash(key); - auto it = counter_map.find(key, hash); - if (it != counter_map.end()) + auto counter = findCounter(key, hash); + if (counter) { - auto c = it->getSecond(); - c->count += increment; - c->error += error; - percolate(c); + counter->count += increment; + counter->error += error; + percolate(counter); return; } // Key doesn't exist, but can fit in the top K @@ -177,6 +176,7 @@ public: push(new Counter(arena.emplace(key), increment, error, hash)); return; } + const size_t alpha_mask = alpha_map.size() - 1; auto & alpha = alpha_map[hash & alpha_mask]; if (alpha + increment < min->count) @@ -187,22 +187,16 @@ public: // Erase the current minimum element alpha_map[min->hash & alpha_mask] = min->count; - it = counter_map.find(min->key, min->hash); - // Replace minimum with newly inserted element - if (it != counter_map.end()) - { - arena.free(min->key); - min->hash = hash; - min->key = arena.emplace(key); - min->count = alpha + increment; - min->error = alpha + error; - percolate(min); + counter_list.pop_back(); + counter = findCounter(min->key, min->hash); + counter->slot = -1; - it->getSecond() = min; - it->getFirstMutable() = min->key; - counter_map.reinsert(it, hash); - } + ++removed_keys; + if (removed_keys * 2 > counter_map.size()) + rebuildCounterMap(); + + push(new Counter(arena.emplace(key), alpha + increment, alpha + error, hash)); } /* @@ -242,7 +236,7 @@ public: // The list is sorted in descending order, we have to scan in reverse for (auto counter : boost::adaptors::reverse(rhs.counter_list)) { - if (counter_map.find(counter->key) != counter_map.end()) + if (findCounter(counter->key, counter_map.hash(counter->key))) { // Subtract m2 previously added, guaranteed not negative insert(counter->key, counter->count - m2, counter->error - m2); @@ -346,19 +340,46 @@ private: void destroyLastElement() { auto last_element = counter_list.back(); - auto cell = counter_map.find(last_element->key, last_element->hash); - cell->setZero(); - counter_map.reinsert(cell, last_element->hash); + last_element->slot = -1; counter_list.pop_back(); - arena.free(last_element->key); - delete last_element; + + ++removed_keys; + if (removed_keys * 2 > counter_map.size()) + rebuildCounterMap(); } - HashMap counter_map; + Counter * findCounter(const TKey & key, size_t hash) + { + auto it = counter_map.find(key, hash); + if (it == counter_map.end() || it->getSecond()->slot == -1) + return nullptr; + + return it->getSecond(); + } + + void rebuildCounterMap() + { + removed_keys = 0; + for (const auto & cell : counter_map) + { + auto counter = cell.getSecond(); + if (counter->slot == -1) + delete counter; + } + + counter_map.clear(); + for (auto counter : counter_list) + counter_map[counter->key] = counter; + } + + using CounterMap = HashMap; + + CounterMap counter_map; std::vector counter_list; std::vector alpha_map; SpaceSavingArena arena; size_t m_capacity; + size_t removed_keys = 0; }; } From 61a8abb813fceb5565d48fafb4fb829f767ce825 Mon Sep 17 00:00:00 2001 From: CurtizJ Date: Thu, 8 Aug 2019 15:42:48 +0300 Subject: [PATCH 5/9] add more tests for functions topK and topKWeighted --- .../0_stateless/00981_topK_topKWeighted.reference | 3 +++ .../queries/0_stateless/00981_topK_topKWeighted.sql | 12 ++++++++++++ 2 files changed, 15 insertions(+) create mode 100644 dbms/tests/queries/0_stateless/00981_topK_topKWeighted.reference create mode 100644 dbms/tests/queries/0_stateless/00981_topK_topKWeighted.sql diff --git a/dbms/tests/queries/0_stateless/00981_topK_topKWeighted.reference b/dbms/tests/queries/0_stateless/00981_topK_topKWeighted.reference new file mode 100644 index 00000000000..ca039c9c498 --- /dev/null +++ b/dbms/tests/queries/0_stateless/00981_topK_topKWeighted.reference @@ -0,0 +1,3 @@ +['0','1','2','3','4','5','6','7','8','9'] +['0','1','2','3','4','5','6','7','8','9'] +['2999999','2999998','2999997','2999996','2999995','2999994','2999993','2999992','2999991','2999990'] diff --git a/dbms/tests/queries/0_stateless/00981_topK_topKWeighted.sql b/dbms/tests/queries/0_stateless/00981_topK_topKWeighted.sql new file mode 100644 index 00000000000..c1f385fae80 --- /dev/null +++ b/dbms/tests/queries/0_stateless/00981_topK_topKWeighted.sql @@ -0,0 +1,12 @@ +DROP TABLE IF EXISTS topk; + +CREATE TABLE topk (val1 String, val2 UInt32) ENGINE = MergeTree ORDER BY val1; + +INSERT INTO topk SELECT toString(number), number FROM numbers(3000000); +INSERT INTO topk SELECT toString(number % 10), 999999999 FROM numbers(1000000); + +SELECT arraySort(topK(10)(val1)) FROM topk; +SELECT arraySort(topKWeighted(10)(val1, val2)) FROM topk; +SELECT topKWeighted(10)(toString(number), number) from numbers(3000000); + +DROP TABLE topk; From a56d897c2cfc75dcd44eff5c978eaff035cf717d Mon Sep 17 00:00:00 2001 From: CurtizJ Date: Thu, 8 Aug 2019 15:55:08 +0300 Subject: [PATCH 6/9] better performance in topK function --- dbms/src/Common/SpaceSaving.h | 9 +-------- 1 file changed, 1 insertion(+), 8 deletions(-) diff --git a/dbms/src/Common/SpaceSaving.h b/dbms/src/Common/SpaceSaving.h index e5173027fa5..832a528bd48 100644 --- a/dbms/src/Common/SpaceSaving.h +++ b/dbms/src/Common/SpaceSaving.h @@ -187,14 +187,7 @@ public: // Erase the current minimum element alpha_map[min->hash & alpha_mask] = min->count; - - counter_list.pop_back(); - counter = findCounter(min->key, min->hash); - counter->slot = -1; - - ++removed_keys; - if (removed_keys * 2 > counter_map.size()) - rebuildCounterMap(); + destroyLastElement(); push(new Counter(arena.emplace(key), alpha + increment, alpha + error, hash)); } From ead6336d2c1e36bb4af464ab0e515f504202cec4 Mon Sep 17 00:00:00 2001 From: CurtizJ Date: Fri, 9 Aug 2019 13:11:50 +0300 Subject: [PATCH 7/9] function topK: fix merge stage and fix memory leaks --- dbms/src/Common/SpaceSaving.h | 41 +++++++++++++------ ...=> 00981_topK_topKWeighted_long.reference} | 0 ...d.sql => 00981_topK_topKWeighted_long.sql} | 0 3 files changed, 28 insertions(+), 13 deletions(-) rename dbms/tests/queries/0_stateless/{00981_topK_topKWeighted.reference => 00981_topK_topKWeighted_long.reference} (100%) rename dbms/tests/queries/0_stateless/{00981_topK_topKWeighted.sql => 00981_topK_topKWeighted_long.sql} (100%) diff --git a/dbms/src/Common/SpaceSaving.h b/dbms/src/Common/SpaceSaving.h index 832a528bd48..da7e9293723 100644 --- a/dbms/src/Common/SpaceSaving.h +++ b/dbms/src/Common/SpaceSaving.h @@ -113,7 +113,7 @@ public: } TKey key; - ssize_t slot; + size_t slot; size_t hash; UInt64 count; UInt64 error; @@ -229,17 +229,35 @@ public: // The list is sorted in descending order, we have to scan in reverse for (auto counter : boost::adaptors::reverse(rhs.counter_list)) { - if (findCounter(counter->key, counter_map.hash(counter->key))) + size_t hash = counter_map.hash(counter->key); + if (auto current = findCounter(counter->key, hash)) { // Subtract m2 previously added, guaranteed not negative - insert(counter->key, counter->count - m2, counter->error - m2); + current->count += (counter->count - m2); + current->error += (counter->error - m2); } else { // Counters not monitored in S1 - insert(counter->key, counter->count + m1, counter->error + m1); + counter_list.push_back(new Counter(arena.emplace(counter->key), counter->count + m1, counter->error + m1, hash)); } } + + std::sort(counter_list.begin(), counter_list.end(), [](Counter * l, Counter * r) { return *l > *r; }); + + if (counter_list.size() > m_capacity) + { + for (size_t i = m_capacity; i < counter_list.size(); ++i) + { + arena.free(counter_list[i]->key); + delete counter_list[i]; + } + counter_list.resize(m_capacity); + } + + for (size_t i = 0; i < counter_list.size(); ++i) + counter_list[i]->slot = i; + rebuildCounterMap(); } std::vector topK(size_t k) const @@ -323,7 +341,10 @@ private: void destroyElements() { for (auto counter : counter_list) + { + arena.free(counter->key); delete counter; + } counter_map.clear(); counter_list.clear(); @@ -333,7 +354,8 @@ private: void destroyLastElement() { auto last_element = counter_list.back(); - last_element->slot = -1; + arena.free(last_element->key); + delete last_element; counter_list.pop_back(); ++removed_keys; @@ -344,7 +366,7 @@ private: Counter * findCounter(const TKey & key, size_t hash) { auto it = counter_map.find(key, hash); - if (it == counter_map.end() || it->getSecond()->slot == -1) + if (it == counter_map.end()) return nullptr; return it->getSecond(); @@ -353,13 +375,6 @@ private: void rebuildCounterMap() { removed_keys = 0; - for (const auto & cell : counter_map) - { - auto counter = cell.getSecond(); - if (counter->slot == -1) - delete counter; - } - counter_map.clear(); for (auto counter : counter_list) counter_map[counter->key] = counter; diff --git a/dbms/tests/queries/0_stateless/00981_topK_topKWeighted.reference b/dbms/tests/queries/0_stateless/00981_topK_topKWeighted_long.reference similarity index 100% rename from dbms/tests/queries/0_stateless/00981_topK_topKWeighted.reference rename to dbms/tests/queries/0_stateless/00981_topK_topKWeighted_long.reference diff --git a/dbms/tests/queries/0_stateless/00981_topK_topKWeighted.sql b/dbms/tests/queries/0_stateless/00981_topK_topKWeighted_long.sql similarity index 100% rename from dbms/tests/queries/0_stateless/00981_topK_topKWeighted.sql rename to dbms/tests/queries/0_stateless/00981_topK_topKWeighted_long.sql From e06c994b0e97c0553dc415d1a2826d8c3b2efecc Mon Sep 17 00:00:00 2001 From: chertus Date: Fri, 9 Aug 2019 17:50:04 +0300 Subject: [PATCH 8/9] refactoring: move collectUsedColumns from ExpressionAnalyzer to SyntaxAnalyzer --- dbms/src/Interpreters/AnalyzedJoin.cpp | 2 +- dbms/src/Interpreters/AnalyzedJoin.h | 1 + dbms/src/Interpreters/ExpressionAnalyzer.cpp | 224 ++---------------- dbms/src/Interpreters/ExpressionAnalyzer.h | 32 +-- .../Interpreters/InterpreterSelectQuery.cpp | 21 +- .../src/Interpreters/MutationsInterpreter.cpp | 3 +- .../RequiredSourceColumnsVisitor.cpp | 12 +- .../RequiredSourceColumnsVisitor.h | 12 +- dbms/src/Interpreters/SyntaxAnalyzer.cpp | 174 +++++++++++++- dbms/src/Interpreters/SyntaxAnalyzer.h | 13 +- .../Interpreters/evaluateMissingDefaults.cpp | 2 +- dbms/src/Storages/MergeTree/MergeTreeData.cpp | 6 +- .../transformQueryForExternalDatabase.cpp | 4 +- 13 files changed, 248 insertions(+), 258 deletions(-) diff --git a/dbms/src/Interpreters/AnalyzedJoin.cpp b/dbms/src/Interpreters/AnalyzedJoin.cpp index 8357ab80aba..44e145dba8a 100644 --- a/dbms/src/Interpreters/AnalyzedJoin.cpp +++ b/dbms/src/Interpreters/AnalyzedJoin.cpp @@ -80,7 +80,7 @@ ExpressionActionsPtr AnalyzedJoin::createJoinedBlockActions( ASTPtr query = expression_list; auto syntax_result = SyntaxAnalyzer(context).analyze(query, columns_from_joined_table, required_columns); - ExpressionAnalyzer analyzer(query, syntax_result, context, {}, required_columns_set); + ExpressionAnalyzer analyzer(query, syntax_result, context, required_columns_set); return analyzer.getActions(true, false); } diff --git a/dbms/src/Interpreters/AnalyzedJoin.h b/dbms/src/Interpreters/AnalyzedJoin.h index d820cb0da7b..db76acd5ac3 100644 --- a/dbms/src/Interpreters/AnalyzedJoin.h +++ b/dbms/src/Interpreters/AnalyzedJoin.h @@ -33,6 +33,7 @@ struct AnalyzedJoin private: friend class SyntaxAnalyzer; + friend struct SyntaxAnalyzerResult; friend class ExpressionAnalyzer; Names key_names_left; diff --git a/dbms/src/Interpreters/ExpressionAnalyzer.cpp b/dbms/src/Interpreters/ExpressionAnalyzer.cpp index fdc8226a42a..9405f7cedeb 100644 --- a/dbms/src/Interpreters/ExpressionAnalyzer.cpp +++ b/dbms/src/Interpreters/ExpressionAnalyzer.cpp @@ -58,7 +58,6 @@ #include #include #include -#include namespace DB { @@ -77,28 +76,15 @@ ExpressionAnalyzer::ExpressionAnalyzer( const ASTPtr & query_, const SyntaxAnalyzerResultPtr & syntax_analyzer_result_, const Context & context_, - const NamesAndTypesList & additional_source_columns, const NameSet & required_result_columns_, size_t subquery_depth_, bool do_global_, const SubqueriesForSets & subqueries_for_sets_) - : ExpressionAnalyzerData(syntax_analyzer_result_->source_columns, required_result_columns_, subqueries_for_sets_) + : ExpressionAnalyzerData(required_result_columns_, subqueries_for_sets_) , query(query_), context(context_), settings(context.getSettings()) , subquery_depth(subquery_depth_), do_global(do_global_) , syntax(syntax_analyzer_result_) { - storage = syntax->storage; - rewrite_subqueries = syntax->rewrite_subqueries; - - if (!additional_source_columns.empty()) - { - source_columns.insert(source_columns.end(), additional_source_columns.begin(), additional_source_columns.end()); - removeDuplicateColumns(source_columns); - } - - /// Delete the unnecessary from `source_columns` list. Form `columns_added_by_join`. - collectUsedColumns(); - /// external_tables, subqueries_for_sets for global subqueries. /// Replaces global subqueries with the generated names of temporary tables that will be sent to remote servers. initGlobalSubqueriesAndExternalTables(); @@ -115,7 +101,7 @@ ExpressionAnalyzer::ExpressionAnalyzer( bool ExpressionAnalyzer::isRemoteStorage() const { - return storage && storage->isRemote(); + return storage() && storage()->isRemote(); } @@ -133,7 +119,7 @@ void ExpressionAnalyzer::analyzeAggregation() if (select_query && (select_query->groupBy() || select_query->having())) has_aggregation = true; - ExpressionActionsPtr temp_actions = std::make_shared(source_columns, context); + ExpressionActionsPtr temp_actions = std::make_shared(sourceColumns(), context); if (select_query) { @@ -256,7 +242,7 @@ void ExpressionAnalyzer::makeSetsForIndex() { const auto * select_query = query->as(); - if (storage && select_query && storage->supportsIndexForIn()) + if (storage() && select_query && storage()->supportsIndexForIn()) { if (select_query->where()) makeSetsForIndexImpl(select_query->where()); @@ -312,7 +298,7 @@ void ExpressionAnalyzer::makeSetsForIndexImpl(const ASTPtr & node) { const IAST & args = *func->arguments; - if (storage && storage->mayBenefitFromIndexForIn(args.children.at(0), context)) + if (storage() && storage()->mayBenefitFromIndexForIn(args.children.at(0), context)) { const ASTPtr & arg = args.children.at(1); if (arg->as() || arg->as()) @@ -322,9 +308,9 @@ void ExpressionAnalyzer::makeSetsForIndexImpl(const ASTPtr & node) } else { - NamesAndTypesList temp_columns = source_columns; + NamesAndTypesList temp_columns = sourceColumns(); temp_columns.insert(temp_columns.end(), array_join_columns.begin(), array_join_columns.end()); - for (const auto & joined_column : columns_added_by_join) + for (const auto & joined_column : columnsAddedByJoin()) temp_columns.push_back(joined_column); ExpressionActionsPtr temp_actions = std::make_shared(temp_columns, context); getRootActions(func->arguments->children.at(0), true, temp_actions); @@ -343,7 +329,7 @@ void ExpressionAnalyzer::getRootActions(const ASTPtr & ast, bool no_subqueries, { LogAST log; ActionsVisitor actions_visitor(context, settings.size_limits_for_set, subquery_depth, - source_columns, actions, prepared_sets, subqueries_for_sets, + sourceColumns(), actions, prepared_sets, subqueries_for_sets, no_subqueries, only_consts, !isRemoteStorage(), log.stream()); actions_visitor.visit(ast); actions = actions_visitor.popActionsLevel(); @@ -356,7 +342,7 @@ void ExpressionAnalyzer::getActionsFromJoinKeys(const ASTTableJoin & table_join, LogAST log; ActionsVisitor actions_visitor(context, settings.size_limits_for_set, subquery_depth, - source_columns, actions, prepared_sets, subqueries_for_sets, + sourceColumns(), actions, prepared_sets, subqueries_for_sets, no_subqueries, only_consts, !isRemoteStorage(), log.stream()); if (table_join.using_expression_list) @@ -494,7 +480,7 @@ bool ExpressionAnalyzer::appendArrayJoin(ExpressionActionsChain & chain, bool on if (!array_join_expression_list) return false; - initChain(chain, source_columns); + initChain(chain, sourceColumns()); ExpressionActionsChain::Step & step = chain.steps.back(); getRootActions(array_join_expression_list, only_types, step.actions); @@ -507,12 +493,12 @@ bool ExpressionAnalyzer::appendArrayJoin(ExpressionActionsChain & chain, bool on void ExpressionAnalyzer::addJoinAction(ExpressionActionsPtr & actions, bool only_types) const { if (only_types) - actions->add(ExpressionAction::ordinaryJoin(nullptr, analyzedJoin().key_names_left, columns_added_by_join)); + actions->add(ExpressionAction::ordinaryJoin(nullptr, analyzedJoin().key_names_left, columnsAddedByJoin())); else for (auto & subquery_for_set : subqueries_for_sets) if (subquery_for_set.second.join) actions->add(ExpressionAction::ordinaryJoin(subquery_for_set.second.join, analyzedJoin().key_names_left, - columns_added_by_join)); + columnsAddedByJoin())); } static void appendRequiredColumns( @@ -536,7 +522,7 @@ bool ExpressionAnalyzer::appendJoin(ExpressionActionsChain & chain, bool only_ty if (!select_query->join()) return false; - initChain(chain, source_columns); + initChain(chain, sourceColumns()); ExpressionActionsChain::Step & step = chain.steps.back(); const auto & join_element = select_query->join()->as(); @@ -588,7 +574,7 @@ bool ExpressionAnalyzer::appendJoin(ExpressionActionsChain & chain, bool only_ty auto & analyzed_join = analyzedJoin(); /// Actions which need to be calculated on joined block. ExpressionActionsPtr joined_block_actions = - analyzed_join.createJoinedBlockActions(columns_added_by_join, select_query, context); + analyzed_join.createJoinedBlockActions(columnsAddedByJoin(), select_query, context); /** For GLOBAL JOINs (in the case, for example, of the push method for executing GLOBAL subqueries), the following occurs * - in the addExternalStorage function, the JOIN (SELECT ...) subquery is replaced with JOIN _data1, @@ -610,7 +596,7 @@ bool ExpressionAnalyzer::appendJoin(ExpressionActionsChain & chain, bool only_ty NameSet required_columns(action_columns.begin(), action_columns.end()); appendRequiredColumns( - required_columns, joined_block_actions->getSampleBlock(), analyzed_join.key_names_right, columns_added_by_join); + required_columns, joined_block_actions->getSampleBlock(), analyzed_join.key_names_right, columnsAddedByJoin()); auto original_map = analyzed_join.getOriginalColumnsMap(required_columns); Names original_columns; @@ -647,7 +633,7 @@ bool ExpressionAnalyzer::appendPrewhere( if (!select_query->prewhere()) return false; - initChain(chain, source_columns); + initChain(chain, sourceColumns()); auto & step = chain.getLastStep(); getRootActions(select_query->prewhere(), only_types, step.actions); String prewhere_column_name = select_query->prewhere()->getColumnName(); @@ -656,7 +642,7 @@ bool ExpressionAnalyzer::appendPrewhere( { /// Remove unused source_columns from prewhere actions. - auto tmp_actions = std::make_shared(source_columns, context); + auto tmp_actions = std::make_shared(sourceColumns(), context); getRootActions(select_query->prewhere(), only_types, tmp_actions); tmp_actions->finalize({prewhere_column_name}); auto required_columns = tmp_actions->getRequiredColumns(); @@ -676,7 +662,7 @@ bool ExpressionAnalyzer::appendPrewhere( auto names = step.actions->getSampleBlock().getNames(); NameSet name_set(names.begin(), names.end()); - for (const auto & column : source_columns) + for (const auto & column : sourceColumns()) if (required_source_columns.count(column.name) == 0) name_set.erase(column.name); @@ -697,7 +683,7 @@ bool ExpressionAnalyzer::appendPrewhere( NameSet prewhere_input_names(required_columns.begin(), required_columns.end()); NameSet unused_source_columns; - for (const auto & column : source_columns) + for (const auto & column : sourceColumns()) { if (prewhere_input_names.count(column.name) == 0) { @@ -722,7 +708,7 @@ bool ExpressionAnalyzer::appendWhere(ExpressionActionsChain & chain, bool only_t if (!select_query->where()) return false; - initChain(chain, source_columns); + initChain(chain, sourceColumns()); ExpressionActionsChain::Step & step = chain.steps.back(); step.required_output.push_back(select_query->where()->getColumnName()); @@ -742,7 +728,7 @@ bool ExpressionAnalyzer::appendGroupBy(ExpressionActionsChain & chain, bool only if (!select_query->groupBy()) return false; - initChain(chain, source_columns); + initChain(chain, sourceColumns()); ExpressionActionsChain::Step & step = chain.steps.back(); ASTs asts = select_query->groupBy()->children; @@ -761,7 +747,7 @@ void ExpressionAnalyzer::appendAggregateFunctionsArguments(ExpressionActionsChai assertAggregation(); - initChain(chain, source_columns); + initChain(chain, sourceColumns()); ExpressionActionsChain::Step & step = chain.steps.back(); for (size_t i = 0; i < aggregate_descriptions.size(); ++i) @@ -899,7 +885,7 @@ void ExpressionAnalyzer::appendProjectResult(ExpressionActionsChain & chain) con void ExpressionAnalyzer::appendExpression(ExpressionActionsChain & chain, const ASTPtr & expr, bool only_types) { - initChain(chain, source_columns); + initChain(chain, sourceColumns()); ExpressionActionsChain::Step & step = chain.steps.back(); getRootActions(expr, only_types, step.actions); step.required_output.push_back(expr->getColumnName()); @@ -921,7 +907,7 @@ void ExpressionAnalyzer::getActionsBeforeAggregation(const ASTPtr & ast, Express ExpressionActionsPtr ExpressionAnalyzer::getActions(bool add_aliases, bool project_result) { - ExpressionActionsPtr actions = std::make_shared(source_columns, context); + ExpressionActionsPtr actions = std::make_shared(sourceColumns(), context); NamesWithAliases result_columns; Names result_names; @@ -956,7 +942,7 @@ ExpressionActionsPtr ExpressionAnalyzer::getActions(bool add_aliases, bool proje if (!(add_aliases && project_result)) { /// We will not delete the original columns. - for (const auto & column_name_type : source_columns) + for (const auto & column_name_type : sourceColumns()) result_names.push_back(column_name_type.name); } @@ -982,164 +968,4 @@ void ExpressionAnalyzer::getAggregateInfo(Names & key_names, AggregateDescriptio aggregates = aggregate_descriptions; } -void ExpressionAnalyzer::collectUsedColumns() -{ - /** Calculate which columns are required to execute the expression. - * Then, delete all other columns from the list of available columns. - * After execution, columns will only contain the list of columns needed to read from the table. - */ - - RequiredSourceColumnsVisitor::Data columns_context; - RequiredSourceColumnsVisitor(columns_context).visit(query); - - NameSet source_column_names; - for (const auto & column : source_columns) - source_column_names.insert(column.name); - - NameSet required = columns_context.requiredColumns(); - - if (columns_context.has_table_join) - { - const AnalyzedJoin & analyzed_join = analyzedJoin(); - NameSet avaliable_columns; - for (const auto & name : source_columns) - avaliable_columns.insert(name.name); - - /// Add columns obtained by JOIN (if needed). - columns_added_by_join.clear(); - for (const auto & joined_column : analyzed_join.available_joined_columns) - { - auto & name = joined_column.name; - if (avaliable_columns.count(name)) - continue; - - if (required.count(name)) - { - /// Optimisation: do not add columns needed only in JOIN ON section. - if (columns_context.nameInclusion(name) > analyzed_join.rightKeyInclusion(name)) - columns_added_by_join.push_back(joined_column); - required.erase(name); - } - } - } - - NameSet array_join_sources; - if (columns_context.has_array_join) - { - /// Insert the columns required for the ARRAY JOIN calculation into the required columns list. - for (const auto & result_source : syntax->array_join_result_to_source) - array_join_sources.insert(result_source.second); - - for (const auto & column_name_type : source_columns) - if (array_join_sources.count(column_name_type.name)) - required.insert(column_name_type.name); - } - - const auto * select_query = query->as(); - - /// You need to read at least one column to find the number of rows. - if (select_query && required.empty()) - { - /// We will find a column with minimum . - /// Because it is the column that is cheapest to read. - struct ColumnSizeTuple - { - size_t compressed_size; - size_t type_size; - size_t uncompressed_size; - String name; - bool operator<(const ColumnSizeTuple & that) const - { - return std::tie(compressed_size, type_size, uncompressed_size) - < std::tie(that.compressed_size, that.type_size, that.uncompressed_size); - } - }; - std::vector columns; - if (storage) - { - auto column_sizes = storage->getColumnSizes(); - for (auto & source_column : source_columns) - { - auto c = column_sizes.find(source_column.name); - if (c == column_sizes.end()) - continue; - size_t type_size = source_column.type->haveMaximumSizeOfValue() ? source_column.type->getMaximumSizeOfValueInMemory() : 100; - columns.emplace_back(ColumnSizeTuple{c->second.data_compressed, type_size, c->second.data_uncompressed, source_column.name}); - } - } - if (columns.size()) - required.insert(std::min_element(columns.begin(), columns.end())->name); - else - /// If we have no information about columns sizes, choose a column of minimum size of its data type. - required.insert(ExpressionActions::getSmallestColumn(source_columns)); - } - - NameSet unknown_required_source_columns = required; - - for (NamesAndTypesList::iterator it = source_columns.begin(); it != source_columns.end();) - { - const String & column_name = it->name; - unknown_required_source_columns.erase(column_name); - - if (!required.count(column_name)) - source_columns.erase(it++); - else - ++it; - } - - /// If there are virtual columns among the unknown columns. Remove them from the list of unknown and add - /// in columns list, so that when further processing they are also considered. - if (storage) - { - for (auto it = unknown_required_source_columns.begin(); it != unknown_required_source_columns.end();) - { - if (storage->hasColumn(*it)) - { - source_columns.push_back(storage->getColumn(*it)); - unknown_required_source_columns.erase(it++); - } - else - ++it; - } - } - - if (!unknown_required_source_columns.empty()) - { - std::stringstream ss; - ss << "Missing columns:"; - for (const auto & name : unknown_required_source_columns) - ss << " '" << name << "'"; - ss << " while processing query: '" << query << "'"; - - ss << ", required columns:"; - for (const auto & name : columns_context.requiredColumns()) - ss << " '" << name << "'"; - - if (!source_column_names.empty()) - { - ss << ", source columns:"; - for (const auto & name : source_column_names) - ss << " '" << name << "'"; - } - else - ss << ", no source columns"; - - if (columns_context.has_table_join) - { - ss << ", joined columns:"; - for (const auto & column : analyzedJoin().available_joined_columns) - ss << " '" << column.name << "'"; - } - - if (!array_join_sources.empty()) - { - ss << ", arrayJoin columns:"; - for (const auto & name : array_join_sources) - ss << " '" << name << "'"; - } - - throw Exception(ss.str(), ErrorCodes::UNKNOWN_IDENTIFIER); - } -} - } diff --git a/dbms/src/Interpreters/ExpressionAnalyzer.h b/dbms/src/Interpreters/ExpressionAnalyzer.h index 644d10da1be..3bb8a2bab07 100644 --- a/dbms/src/Interpreters/ExpressionAnalyzer.h +++ b/dbms/src/Interpreters/ExpressionAnalyzer.h @@ -29,13 +29,8 @@ struct SyntaxAnalyzerResult; using SyntaxAnalyzerResultPtr = std::shared_ptr; /// ExpressionAnalyzer sources, intermediates and results. It splits data and logic, allows to test them separately. -/// If you are not writing a test you probably don't need it. Use ExpressionAnalyzer itself. struct ExpressionAnalyzerData { - /// Original columns. - /// First, all available columns of the table are placed here. Then (when analyzing the query), unused columns are deleted. - NamesAndTypesList source_columns; - /// If non-empty, ignore all expressions in not from this list. NameSet required_result_columns; @@ -58,15 +53,10 @@ struct ExpressionAnalyzerData /// Predicate optimizer overrides the sub queries bool rewrite_subqueries = false; - /// Columns will be added to block by join. - NamesAndTypesList columns_added_by_join; /// Subset of analyzed_join.available_joined_columns - protected: - ExpressionAnalyzerData(const NamesAndTypesList & source_columns_, - const NameSet & required_result_columns_, + ExpressionAnalyzerData(const NameSet & required_result_columns_, const SubqueriesForSets & subqueries_for_sets_) - : source_columns(source_columns_), - required_result_columns(required_result_columns_), + : required_result_columns(required_result_columns_), subqueries_for_sets(subqueries_for_sets_) {} }; @@ -102,7 +92,6 @@ public: const ASTPtr & query_, const SyntaxAnalyzerResultPtr & syntax_analyzer_result_, const Context & context_, - const NamesAndTypesList & additional_source_columns = {}, const NameSet & required_result_columns_ = {}, size_t subquery_depth_ = 0, bool do_global_ = false, @@ -114,11 +103,6 @@ public: /// Get a list of aggregation keys and descriptions of aggregate functions if the query contains GROUP BY. void getAggregateInfo(Names & key_names, AggregateDescriptions & aggregates) const; - /** Get a set of columns that are enough to read from the table to evaluate the expression. - * Columns added from another table by JOIN are not counted. - */ - Names getRequiredSourceColumns() const { return source_columns.getNames(); } - /** These methods allow you to build a chain of transformations over a block, that receives values in the desired sections of the query. * * Example usage: @@ -182,25 +166,21 @@ public: /// Create Set-s that we can from IN section to use the index on them. void makeSetsForIndex(); - bool isRewriteSubqueriesPredicate() { return rewrite_subqueries; } - bool hasGlobalSubqueries() { return has_global_subqueries; } private: ASTPtr query; const Context & context; const ExtractedSettings settings; - StoragePtr storage; /// The main table in FROM clause, if exists. size_t subquery_depth; bool do_global; /// Do I need to prepare for execution global subqueries when analyzing the query. SyntaxAnalyzerResultPtr syntax; - const AnalyzedJoin & analyzedJoin() const { return syntax->analyzed_join; } - /** Remove all unnecessary columns from the list of all available columns of the table (`columns`). - * At the same time, form a set of columns added by JOIN (`columns_added_by_join`). - */ - void collectUsedColumns(); + const StoragePtr & storage() const { return syntax->storage; } /// The main table in FROM clause, if exists. + const AnalyzedJoin & analyzedJoin() const { return syntax->analyzed_join; } + const NamesAndTypesList & sourceColumns() const { return syntax->required_source_columns; } + const NamesAndTypesList & columnsAddedByJoin() const { return syntax->columns_added_by_join; } /// Find global subqueries in the GLOBAL IN/JOIN sections. Fills in external_tables. void initGlobalSubqueriesAndExternalTables(); diff --git a/dbms/src/Interpreters/InterpreterSelectQuery.cpp b/dbms/src/Interpreters/InterpreterSelectQuery.cpp index 9682d0e29e4..0335fbd2a81 100644 --- a/dbms/src/Interpreters/InterpreterSelectQuery.cpp +++ b/dbms/src/Interpreters/InterpreterSelectQuery.cpp @@ -292,9 +292,9 @@ InterpreterSelectQuery::InterpreterSelectQuery( table_lock = storage->lockStructureForShare(false, context.getCurrentQueryId()); syntax_analyzer_result = SyntaxAnalyzer(context, options).analyze( - query_ptr, source_header.getNamesAndTypesList(), required_result_column_names, storage); + query_ptr, source_header.getNamesAndTypesList(), required_result_column_names, storage, NamesAndTypesList()); query_analyzer = std::make_unique( - query_ptr, syntax_analyzer_result, context, NamesAndTypesList(), + query_ptr, syntax_analyzer_result, context, NameSet(required_result_column_names.begin(), required_result_column_names.end()), options.subquery_depth, !options.only_analyze); @@ -317,7 +317,7 @@ InterpreterSelectQuery::InterpreterSelectQuery( if (!options.only_analyze || options.modify_inplace) { - if (query_analyzer->isRewriteSubqueriesPredicate()) + if (syntax_analyzer_result->rewrite_subqueries) { /// remake interpreter_subquery when PredicateOptimizer rewrites subqueries and main table is subquery if (is_subquery) @@ -336,7 +336,7 @@ InterpreterSelectQuery::InterpreterSelectQuery( interpreter_subquery->ignoreWithTotals(); } - required_columns = query_analyzer->getRequiredSourceColumns(); + required_columns = syntax_analyzer_result->requiredSourceColumns(); if (storage) source_header = storage->getSampleBlockForColumns(required_columns); @@ -675,7 +675,16 @@ static SortingInfoPtr optimizeReadInOrder(const MergeTreeData & merge_tree, cons size_t prefix_size = std::min(order_descr.size(), sorting_key_columns.size()); auto order_by_expr = query.orderBy(); - auto syntax_result = SyntaxAnalyzer(context).analyze(order_by_expr, merge_tree.getColumns().getAllPhysical()); + SyntaxAnalyzerResultPtr syntax_result; + try + { + syntax_result = SyntaxAnalyzer(context).analyze(order_by_expr, merge_tree.getColumns().getAllPhysical()); + } + catch (const Exception &) + { + return {}; + } + for (size_t i = 0; i < prefix_size; ++i) { /// Read in pk order in case of exact match with order key element @@ -789,7 +798,7 @@ void InterpreterSelectQuery::executeImpl(TPipeline & pipeline, const BlockInputS /// Try transferring some condition from WHERE to PREWHERE if enabled and viable if (settings.optimize_move_to_prewhere && query.where() && !query.prewhere() && !query.final()) - MergeTreeWhereOptimizer{current_info, context, merge_tree, query_analyzer->getRequiredSourceColumns(), log}; + MergeTreeWhereOptimizer{current_info, context, merge_tree, syntax_analyzer_result->requiredSourceColumns(), log}; }; if (const MergeTreeData * merge_tree_data = dynamic_cast(storage.get())) diff --git a/dbms/src/Interpreters/MutationsInterpreter.cpp b/dbms/src/Interpreters/MutationsInterpreter.cpp index 69339f66712..b5a60666ae0 100644 --- a/dbms/src/Interpreters/MutationsInterpreter.cpp +++ b/dbms/src/Interpreters/MutationsInterpreter.cpp @@ -186,8 +186,7 @@ void MutationsInterpreter::prepare(bool dry_run) { auto query = column.default_desc.expression->clone(); auto syntax_result = SyntaxAnalyzer(context).analyze(query, all_columns); - ExpressionAnalyzer analyzer(query, syntax_result, context); - for (const String & dependency : analyzer.getRequiredSourceColumns()) + for (const String & dependency : syntax_result->requiredSourceColumns()) { if (updated_columns.count(dependency)) column_to_affected_materialized[dependency].push_back(column.name); diff --git a/dbms/src/Interpreters/RequiredSourceColumnsVisitor.cpp b/dbms/src/Interpreters/RequiredSourceColumnsVisitor.cpp index a6e581cb6a8..b5f9c83db50 100644 --- a/dbms/src/Interpreters/RequiredSourceColumnsVisitor.cpp +++ b/dbms/src/Interpreters/RequiredSourceColumnsVisitor.cpp @@ -40,7 +40,7 @@ static std::vector extractNamesFromLambda(const ASTFunction & node) return names; } -bool RequiredSourceColumnsMatcher::needChildVisit(ASTPtr & node, const ASTPtr & child) +bool RequiredSourceColumnsMatcher::needChildVisit(const ASTPtr & node, const ASTPtr & child) { if (child->as()) return false; @@ -60,7 +60,7 @@ bool RequiredSourceColumnsMatcher::needChildVisit(ASTPtr & node, const ASTPtr & return true; } -void RequiredSourceColumnsMatcher::visit(ASTPtr & ast, Data & data) +void RequiredSourceColumnsMatcher::visit(const ASTPtr & ast, Data & data) { /// results are columns @@ -111,7 +111,7 @@ void RequiredSourceColumnsMatcher::visit(ASTPtr & ast, Data & data) } } -void RequiredSourceColumnsMatcher::visit(ASTSelectQuery & select, const ASTPtr &, Data & data) +void RequiredSourceColumnsMatcher::visit(const ASTSelectQuery & select, const ASTPtr &, Data & data) { /// special case for top-level SELECT items: they are publics for (auto & node : select.select()->children) @@ -128,7 +128,7 @@ void RequiredSourceColumnsMatcher::visit(ASTSelectQuery & select, const ASTPtr & Visitor(data).visit(node); /// revisit select_expression_list (with children) when all the aliases are set - Visitor(data).visit(select.refSelect()); + Visitor(data).visit(select.select()); } void RequiredSourceColumnsMatcher::visit(const ASTIdentifier & node, const ASTPtr &, Data & data) @@ -158,7 +158,7 @@ void RequiredSourceColumnsMatcher::visit(const ASTFunction & node, const ASTPtr } } -void RequiredSourceColumnsMatcher::visit(ASTTablesInSelectQueryElement & node, const ASTPtr &, Data & data) +void RequiredSourceColumnsMatcher::visit(const ASTTablesInSelectQueryElement & node, const ASTPtr &, Data & data) { ASTTableExpression * expr = nullptr; ASTTableJoin * join = nullptr; @@ -177,7 +177,7 @@ void RequiredSourceColumnsMatcher::visit(ASTTablesInSelectQueryElement & node, c } /// ASTIdentifiers here are tables. Do not visit them as generic ones. -void RequiredSourceColumnsMatcher::visit(ASTTableExpression & node, const ASTPtr &, Data & data) +void RequiredSourceColumnsMatcher::visit(const ASTTableExpression & node, const ASTPtr &, Data & data) { if (node.database_and_table_name) data.addTableAliasIfAny(*node.database_and_table_name); diff --git a/dbms/src/Interpreters/RequiredSourceColumnsVisitor.h b/dbms/src/Interpreters/RequiredSourceColumnsVisitor.h index ed3ec75ddc9..b42a95f29ee 100644 --- a/dbms/src/Interpreters/RequiredSourceColumnsVisitor.h +++ b/dbms/src/Interpreters/RequiredSourceColumnsVisitor.h @@ -21,19 +21,19 @@ struct ASTTableExpression; class RequiredSourceColumnsMatcher { public: - using Visitor = InDepthNodeVisitor; + using Visitor = ConstInDepthNodeVisitor; using Data = ColumnNamesContext; - static bool needChildVisit(ASTPtr & node, const ASTPtr & child); - static void visit(ASTPtr & ast, Data & data); + static bool needChildVisit(const ASTPtr & node, const ASTPtr & child); + static void visit(const ASTPtr & ast, Data & data); private: static void visit(const ASTIdentifier & node, const ASTPtr &, Data & data); static void visit(const ASTFunction & node, const ASTPtr &, Data & data); - static void visit(ASTTablesInSelectQueryElement & node, const ASTPtr &, Data & data); - static void visit(ASTTableExpression & node, const ASTPtr &, Data & data); + static void visit(const ASTTablesInSelectQueryElement & node, const ASTPtr &, Data & data); + static void visit(const ASTTableExpression & node, const ASTPtr &, Data & data); static void visit(const ASTArrayJoin & node, const ASTPtr &, Data & data); - static void visit(ASTSelectQuery & select, const ASTPtr &, Data & data); + static void visit(const ASTSelectQuery & select, const ASTPtr &, Data & data); }; /// Extracts all the information about columns and tables from ASTSelectQuery block into ColumnNamesContext object. diff --git a/dbms/src/Interpreters/SyntaxAnalyzer.cpp b/dbms/src/Interpreters/SyntaxAnalyzer.cpp index 0cb833e9bc7..282b19991b1 100644 --- a/dbms/src/Interpreters/SyntaxAnalyzer.cpp +++ b/dbms/src/Interpreters/SyntaxAnalyzer.cpp @@ -13,6 +13,7 @@ #include #include #include +#include #include #include @@ -44,6 +45,7 @@ namespace ErrorCodes extern const int INVALID_JOIN_ON_EXPRESSION; extern const int EMPTY_LIST_OF_COLUMNS_QUERIED; extern const int NOT_IMPLEMENTED; + extern const int UNKNOWN_IDENTIFIER; } NameSet removeDuplicateColumns(NamesAndTypesList & columns) @@ -558,12 +560,181 @@ void checkJoin(const ASTTablesInSelectQueryElement * join) } +/// Calculate which columns are required to execute the expression. +/// Then, delete all other columns from the list of available columns. +/// After execution, columns will only contain the list of columns needed to read from the table. +void SyntaxAnalyzerResult::collectUsedColumns(const ASTPtr & query, const NamesAndTypesList & additional_source_columns) +{ + /// We caclulate required_source_columns with source_columns modifications and swap them on exit + required_source_columns = source_columns; + + if (!additional_source_columns.empty()) + { + source_columns.insert(source_columns.end(), additional_source_columns.begin(), additional_source_columns.end()); + removeDuplicateColumns(source_columns); + } + + RequiredSourceColumnsVisitor::Data columns_context; + RequiredSourceColumnsVisitor(columns_context).visit(query); + + NameSet source_column_names; + for (const auto & column : source_columns) + source_column_names.insert(column.name); + + NameSet required = columns_context.requiredColumns(); + + if (columns_context.has_table_join) + { + NameSet avaliable_columns; + for (const auto & name : source_columns) + avaliable_columns.insert(name.name); + + /// Add columns obtained by JOIN (if needed). + columns_added_by_join.clear(); + for (const auto & joined_column : analyzed_join.available_joined_columns) + { + auto & name = joined_column.name; + if (avaliable_columns.count(name)) + continue; + + if (required.count(name)) + { + /// Optimisation: do not add columns needed only in JOIN ON section. + if (columns_context.nameInclusion(name) > analyzed_join.rightKeyInclusion(name)) + columns_added_by_join.push_back(joined_column); + required.erase(name); + } + } + } + + NameSet array_join_sources; + if (columns_context.has_array_join) + { + /// Insert the columns required for the ARRAY JOIN calculation into the required columns list. + for (const auto & result_source : array_join_result_to_source) + array_join_sources.insert(result_source.second); + + for (const auto & column_name_type : source_columns) + if (array_join_sources.count(column_name_type.name)) + required.insert(column_name_type.name); + } + + const auto * select_query = query->as(); + + /// You need to read at least one column to find the number of rows. + if (select_query && required.empty()) + { + /// We will find a column with minimum . + /// Because it is the column that is cheapest to read. + struct ColumnSizeTuple + { + size_t compressed_size; + size_t type_size; + size_t uncompressed_size; + String name; + bool operator<(const ColumnSizeTuple & that) const + { + return std::tie(compressed_size, type_size, uncompressed_size) + < std::tie(that.compressed_size, that.type_size, that.uncompressed_size); + } + }; + std::vector columns; + if (storage) + { + auto column_sizes = storage->getColumnSizes(); + for (auto & source_column : source_columns) + { + auto c = column_sizes.find(source_column.name); + if (c == column_sizes.end()) + continue; + size_t type_size = source_column.type->haveMaximumSizeOfValue() ? source_column.type->getMaximumSizeOfValueInMemory() : 100; + columns.emplace_back(ColumnSizeTuple{c->second.data_compressed, type_size, c->second.data_uncompressed, source_column.name}); + } + } + if (columns.size()) + required.insert(std::min_element(columns.begin(), columns.end())->name); + else + /// If we have no information about columns sizes, choose a column of minimum size of its data type. + required.insert(ExpressionActions::getSmallestColumn(source_columns)); + } + + NameSet unknown_required_source_columns = required; + + for (NamesAndTypesList::iterator it = source_columns.begin(); it != source_columns.end();) + { + const String & column_name = it->name; + unknown_required_source_columns.erase(column_name); + + if (!required.count(column_name)) + source_columns.erase(it++); + else + ++it; + } + + /// If there are virtual columns among the unknown columns. Remove them from the list of unknown and add + /// in columns list, so that when further processing they are also considered. + if (storage) + { + for (auto it = unknown_required_source_columns.begin(); it != unknown_required_source_columns.end();) + { + if (storage->hasColumn(*it)) + { + source_columns.push_back(storage->getColumn(*it)); + unknown_required_source_columns.erase(it++); + } + else + ++it; + } + } + + if (!unknown_required_source_columns.empty()) + { + std::stringstream ss; + ss << "Missing columns:"; + for (const auto & name : unknown_required_source_columns) + ss << " '" << name << "'"; + ss << " while processing query: '" << queryToString(query) << "'"; + + ss << ", required columns:"; + for (const auto & name : columns_context.requiredColumns()) + ss << " '" << name << "'"; + + if (!source_column_names.empty()) + { + ss << ", source columns:"; + for (const auto & name : source_column_names) + ss << " '" << name << "'"; + } + else + ss << ", no source columns"; + + if (columns_context.has_table_join) + { + ss << ", joined columns:"; + for (const auto & column : analyzed_join.available_joined_columns) + ss << " '" << column.name << "'"; + } + + if (!array_join_sources.empty()) + { + ss << ", arrayJoin columns:"; + for (const auto & name : array_join_sources) + ss << " '" << name << "'"; + } + + throw Exception(ss.str(), ErrorCodes::UNKNOWN_IDENTIFIER); + } + + required_source_columns.swap(source_columns); +} + SyntaxAnalyzerResultPtr SyntaxAnalyzer::analyze( ASTPtr & query, const NamesAndTypesList & source_columns_, const Names & required_result_columns, - StoragePtr storage) const + StoragePtr storage, + const NamesAndTypesList & additional_source_columns) const { auto * select_query = query->as(); if (!storage && select_query) @@ -669,6 +840,7 @@ SyntaxAnalyzerResultPtr SyntaxAnalyzer::analyze( collectJoinedColumns(result.analyzed_join, *select_query, source_columns_set, result.aliases, settings.join_use_nulls); } + result.collectUsedColumns(query, additional_source_columns); return std::make_shared(result); } diff --git a/dbms/src/Interpreters/SyntaxAnalyzer.h b/dbms/src/Interpreters/SyntaxAnalyzer.h index 5896358eb85..c7addb03526 100644 --- a/dbms/src/Interpreters/SyntaxAnalyzer.h +++ b/dbms/src/Interpreters/SyntaxAnalyzer.h @@ -13,8 +13,13 @@ NameSet removeDuplicateColumns(NamesAndTypesList & columns); struct SyntaxAnalyzerResult { StoragePtr storage; + AnalyzedJoin analyzed_join; NamesAndTypesList source_columns; + /// Set of columns that are enough to read from the table to evaluate the expression. It does not include joined columns. + NamesAndTypesList required_source_columns; + /// Columns will be added to block by JOIN. It's a subset of analyzed_join.available_joined_columns + NamesAndTypesList columns_added_by_join; Aliases aliases; @@ -31,10 +36,11 @@ struct SyntaxAnalyzerResult /// Note: not used further. NameToNameMap array_join_name_to_alias; - AnalyzedJoin analyzed_join; - /// Predicate optimizer overrides the sub queries bool rewrite_subqueries = false; + + void collectUsedColumns(const ASTPtr & query, const NamesAndTypesList & additional_source_columns); + Names requiredSourceColumns() const { return required_source_columns.getNames(); } }; using SyntaxAnalyzerResultPtr = std::shared_ptr; @@ -64,7 +70,8 @@ public: ASTPtr & query, const NamesAndTypesList & source_columns_, const Names & required_result_columns = {}, - StoragePtr storage = {}) const; + StoragePtr storage = {}, + const NamesAndTypesList & additional_source_columns = {}) const; private: const Context & context; diff --git a/dbms/src/Interpreters/evaluateMissingDefaults.cpp b/dbms/src/Interpreters/evaluateMissingDefaults.cpp index 83af15d1924..bef41488793 100644 --- a/dbms/src/Interpreters/evaluateMissingDefaults.cpp +++ b/dbms/src/Interpreters/evaluateMissingDefaults.cpp @@ -61,7 +61,7 @@ void evaluateMissingDefaults(Block & block, auto syntax_result = SyntaxAnalyzer(context).analyze(default_expr_list, block.getNamesAndTypesList()); auto expression_analyzer = ExpressionAnalyzer{default_expr_list, syntax_result, context}; - auto required_source_columns = expression_analyzer.getRequiredSourceColumns(); + auto required_source_columns = syntax_result->requiredSourceColumns(); auto rows_was = copy_block.rows(); // Delete all not needed columns in DEFAULT expression. diff --git a/dbms/src/Storages/MergeTree/MergeTreeData.cpp b/dbms/src/Storages/MergeTree/MergeTreeData.cpp index dfc59654629..74e5df41217 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeData.cpp +++ b/dbms/src/Storages/MergeTree/MergeTreeData.cpp @@ -134,8 +134,7 @@ MergeTreeData::MergeTreeData( throw Exception("Sampling expression must be present in the primary key", ErrorCodes::BAD_ARGUMENTS); auto syntax = SyntaxAnalyzer(global_context).analyze(sample_by_ast, getColumns().getAllPhysical()); - columns_required_for_sampling = ExpressionAnalyzer(sample_by_ast, syntax, global_context) - .getRequiredSourceColumns(); + columns_required_for_sampling = syntax->requiredSourceColumns(); } MergeTreeDataFormatVersion min_format_version(0); if (!date_column_name.empty()) @@ -295,8 +294,7 @@ void MergeTreeData::setPrimaryKeyIndicesAndColumns( if (!added_key_column_expr_list->children.empty()) { auto syntax = SyntaxAnalyzer(global_context).analyze(added_key_column_expr_list, all_columns); - Names used_columns = ExpressionAnalyzer(added_key_column_expr_list, syntax, global_context) - .getRequiredSourceColumns(); + Names used_columns = syntax->requiredSourceColumns(); NamesAndTypesList deleted_columns; NamesAndTypesList added_columns; diff --git a/dbms/src/Storages/transformQueryForExternalDatabase.cpp b/dbms/src/Storages/transformQueryForExternalDatabase.cpp index c0fcbabba42..55a0ef95200 100644 --- a/dbms/src/Storages/transformQueryForExternalDatabase.cpp +++ b/dbms/src/Storages/transformQueryForExternalDatabase.cpp @@ -8,7 +8,6 @@ #include #include #include -#include #include #include @@ -111,8 +110,7 @@ String transformQueryForExternalDatabase( { auto clone_query = query.clone(); auto syntax_result = SyntaxAnalyzer(context).analyze(clone_query, available_columns); - ExpressionAnalyzer analyzer(clone_query, syntax_result, context); - const Names & used_columns = analyzer.getRequiredSourceColumns(); + const Names used_columns = syntax_result->requiredSourceColumns(); auto select = std::make_shared(); From 6491de7eddcd3097cde4a288593aab16a349c59b Mon Sep 17 00:00:00 2001 From: chertus Date: Fri, 9 Aug 2019 19:17:01 +0300 Subject: [PATCH 9/9] forget to remove field in last patch --- dbms/src/Interpreters/ExpressionAnalyzer.h | 3 --- 1 file changed, 3 deletions(-) diff --git a/dbms/src/Interpreters/ExpressionAnalyzer.h b/dbms/src/Interpreters/ExpressionAnalyzer.h index 3bb8a2bab07..ed35bafbe75 100644 --- a/dbms/src/Interpreters/ExpressionAnalyzer.h +++ b/dbms/src/Interpreters/ExpressionAnalyzer.h @@ -50,9 +50,6 @@ struct ExpressionAnalyzerData /// All new temporary tables obtained by performing the GLOBAL IN/JOIN subqueries. Tables external_tables; - /// Predicate optimizer overrides the sub queries - bool rewrite_subqueries = false; - protected: ExpressionAnalyzerData(const NameSet & required_result_columns_, const SubqueriesForSets & subqueries_for_sets_)