mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-26 17:41:59 +00:00
Merge pull request #3134 from zhang2014/fix/ISSUES-3110
ISSUES-3110 fix merge and distributed engine query stage
This commit is contained in:
commit
bf53c4a31d
@ -346,24 +346,9 @@ public:
|
||||
/// Returns sampling expression for storage or nullptr if there is no.
|
||||
virtual ASTPtr getSamplingExpression() const { return nullptr; }
|
||||
|
||||
protected:
|
||||
using ITableDeclaration::ITableDeclaration;
|
||||
using std::enable_shared_from_this<IStorage>::shared_from_this;
|
||||
|
||||
void checkQueryProcessingStage(QueryProcessingStage::Enum processed_stage, const Context & context)
|
||||
{
|
||||
auto expected_stage = getQueryProcessingStage(context);
|
||||
checkQueryProcessingStage(processed_stage, expected_stage);
|
||||
}
|
||||
|
||||
void checkQueryProcessingStage(QueryProcessingStage::Enum processed_stage, QueryProcessingStage::Enum expected_stage)
|
||||
{
|
||||
if (processed_stage != expected_stage)
|
||||
throw Exception("Unexpected query processing stage for storage " + getName() +
|
||||
": expected " + QueryProcessingStage::toString(expected_stage) +
|
||||
", got " + QueryProcessingStage::toString(processed_stage), ErrorCodes::LOGICAL_ERROR);
|
||||
}
|
||||
|
||||
private:
|
||||
friend class TableStructureReadLock;
|
||||
|
||||
|
@ -269,11 +269,10 @@ BlockInputStreams StorageKafka::read(
|
||||
const Names & column_names,
|
||||
const SelectQueryInfo & /*query_info*/,
|
||||
const Context & context,
|
||||
QueryProcessingStage::Enum processed_stage,
|
||||
QueryProcessingStage::Enum /*processed_stage*/,
|
||||
size_t max_block_size,
|
||||
unsigned num_streams)
|
||||
{
|
||||
checkQueryProcessingStage(processed_stage, context);
|
||||
check(column_names);
|
||||
|
||||
if (num_consumers == 0)
|
||||
|
@ -136,8 +136,6 @@ BlockInputStreams StorageBuffer::read(
|
||||
size_t max_block_size,
|
||||
unsigned num_streams)
|
||||
{
|
||||
checkQueryProcessingStage(processed_stage, context);
|
||||
|
||||
BlockInputStreams streams_from_dst;
|
||||
|
||||
if (!no_destination)
|
||||
|
@ -261,12 +261,10 @@ void StorageCatBoostPool::createSampleBlockAndColumns()
|
||||
BlockInputStreams StorageCatBoostPool::read(const Names & column_names,
|
||||
const SelectQueryInfo & /*query_info*/,
|
||||
const Context & context,
|
||||
QueryProcessingStage::Enum processed_stage,
|
||||
QueryProcessingStage::Enum /*processed_stage*/,
|
||||
size_t max_block_size,
|
||||
unsigned /*threads*/)
|
||||
{
|
||||
checkQueryProcessingStage(processed_stage, context);
|
||||
|
||||
auto stream = std::make_shared<CatBoostDatasetBlockInputStream>(
|
||||
data_description_file_name, "TSV", sample_block, context, max_block_size);
|
||||
|
||||
|
@ -38,11 +38,10 @@ BlockInputStreams StorageDictionary::read(
|
||||
const Names & column_names,
|
||||
const SelectQueryInfo & /*query_info*/,
|
||||
const Context & context,
|
||||
QueryProcessingStage::Enum processed_stage,
|
||||
QueryProcessingStage::Enum /*processed_stage*/,
|
||||
const size_t max_block_size,
|
||||
const unsigned /*threads*/)
|
||||
{
|
||||
checkQueryProcessingStage(processed_stage, context);
|
||||
auto dictionary = context.getExternalDictionaries().getDictionary(dictionary_name);
|
||||
return BlockInputStreams{dictionary->getBlockInputStream(column_names, max_block_size)};
|
||||
}
|
||||
|
@ -249,7 +249,6 @@ BlockInputStreams StorageDistributed::read(
|
||||
const unsigned /*num_streams*/)
|
||||
{
|
||||
auto cluster = getCluster();
|
||||
checkQueryProcessingStage(processed_stage, getQueryProcessingStage(context, cluster));
|
||||
|
||||
const Settings & settings = context.getSettingsRef();
|
||||
|
||||
|
@ -190,11 +190,10 @@ BlockInputStreams StorageFile::read(
|
||||
const Names & /*column_names*/,
|
||||
const SelectQueryInfo & /*query_info*/,
|
||||
const Context & context,
|
||||
QueryProcessingStage::Enum processed_stage,
|
||||
QueryProcessingStage::Enum /*processed_stage*/,
|
||||
size_t max_block_size,
|
||||
unsigned /*num_streams*/)
|
||||
{
|
||||
checkQueryProcessingStage(processed_stage, context);
|
||||
return BlockInputStreams(1, std::make_shared<StorageFileBlockInputStream>(*this, context, max_block_size));
|
||||
}
|
||||
|
||||
|
@ -572,11 +572,10 @@ BlockInputStreams StorageLog::read(
|
||||
const Names & column_names,
|
||||
const SelectQueryInfo & /*query_info*/,
|
||||
const Context & context,
|
||||
QueryProcessingStage::Enum processed_stage,
|
||||
QueryProcessingStage::Enum /*processed_stage*/,
|
||||
size_t max_block_size,
|
||||
unsigned num_streams)
|
||||
{
|
||||
checkQueryProcessingStage(processed_stage, context);
|
||||
check(column_names);
|
||||
loadMarks();
|
||||
|
||||
|
@ -83,12 +83,11 @@ StorageMemory::StorageMemory(String table_name_, ColumnsDescription columns_desc
|
||||
BlockInputStreams StorageMemory::read(
|
||||
const Names & column_names,
|
||||
const SelectQueryInfo & /*query_info*/,
|
||||
const Context & context,
|
||||
QueryProcessingStage::Enum processed_stage,
|
||||
const Context & /*context*/,
|
||||
QueryProcessingStage::Enum /*processed_stage*/,
|
||||
size_t /*max_block_size*/,
|
||||
unsigned num_streams)
|
||||
{
|
||||
checkQueryProcessingStage(processed_stage, context);
|
||||
check(column_names);
|
||||
|
||||
std::lock_guard<std::mutex> lock(mutex);
|
||||
|
@ -141,7 +141,7 @@ QueryProcessingStage::Enum StorageMerge::getQueryProcessingStage(const Context &
|
||||
auto database = context.getDatabase(source_database);
|
||||
auto iterator = database->getIterator(context);
|
||||
|
||||
bool first = true;
|
||||
size_t selected_table_size = 0;
|
||||
|
||||
while (iterator->isValid())
|
||||
{
|
||||
@ -149,23 +149,14 @@ QueryProcessingStage::Enum StorageMerge::getQueryProcessingStage(const Context &
|
||||
{
|
||||
auto & table = iterator->table();
|
||||
if (table.get() != this)
|
||||
{
|
||||
auto stage = table->getQueryProcessingStage(context);
|
||||
|
||||
if (first)
|
||||
stage_in_source_tables = stage;
|
||||
else if (stage != stage_in_source_tables)
|
||||
throw Exception("Source tables for Merge table are processing data up to different stages",
|
||||
ErrorCodes::INCOMPATIBLE_SOURCE_TABLES);
|
||||
|
||||
first = false;
|
||||
}
|
||||
++selected_table_size;
|
||||
}
|
||||
|
||||
iterator->next();
|
||||
}
|
||||
|
||||
return stage_in_source_tables;
|
||||
auto fetch_or_mergeable_stage = std::min(stage_in_source_tables, QueryProcessingStage::WithMergeableState);
|
||||
return selected_table_size == 1 ? stage_in_source_tables : fetch_or_mergeable_stage;
|
||||
}
|
||||
|
||||
|
||||
@ -202,12 +193,6 @@ BlockInputStreams StorageMerge::read(
|
||||
|
||||
for (const auto & elem : selected_tables)
|
||||
{
|
||||
/// Check processing stage again in case new table was added after getQueryProcessingStage call.
|
||||
auto stage = elem.first->getQueryProcessingStage(context);
|
||||
if (stage != processed_stage)
|
||||
throw Exception("Source tables for Merge table are processing data up to different stages",
|
||||
ErrorCodes::INCOMPATIBLE_SOURCE_TABLES);
|
||||
|
||||
/// If PREWHERE is used in query, you need to make sure that all tables support this.
|
||||
if (typeid_cast<const ASTSelectQuery &>(*query).prewhere_expression)
|
||||
if (!elem.first->supportsPrewhere())
|
||||
|
@ -110,11 +110,10 @@ BlockInputStreams StorageMergeTree::read(
|
||||
const Names & column_names,
|
||||
const SelectQueryInfo & query_info,
|
||||
const Context & context,
|
||||
QueryProcessingStage::Enum processed_stage,
|
||||
QueryProcessingStage::Enum /*processed_stage*/,
|
||||
const size_t max_block_size,
|
||||
const unsigned num_streams)
|
||||
{
|
||||
checkQueryProcessingStage(processed_stage, context);
|
||||
return reader.read(column_names, query_info, context, max_block_size, num_streams, 0);
|
||||
}
|
||||
|
||||
|
@ -51,12 +51,11 @@ BlockInputStreams StorageMySQL::read(
|
||||
const Names & column_names,
|
||||
const SelectQueryInfo & query_info,
|
||||
const Context & context,
|
||||
QueryProcessingStage::Enum processed_stage,
|
||||
QueryProcessingStage::Enum /*processed_stage*/,
|
||||
size_t max_block_size,
|
||||
unsigned)
|
||||
{
|
||||
check(column_names);
|
||||
checkQueryProcessingStage(processed_stage, context);
|
||||
String query = transformQueryForExternalDatabase(
|
||||
*query_info.query, getColumns().ordinary, IdentifierQuotingStyle::Backticks, remote_database_name, remote_table_name, context);
|
||||
|
||||
|
@ -23,12 +23,11 @@ public:
|
||||
BlockInputStreams read(
|
||||
const Names & column_names,
|
||||
const SelectQueryInfo &,
|
||||
const Context & context,
|
||||
QueryProcessingStage::Enum processing_stage,
|
||||
const Context & /*context*/,
|
||||
QueryProcessingStage::Enum /*processing_stage*/,
|
||||
size_t,
|
||||
unsigned) override
|
||||
{
|
||||
checkQueryProcessingStage(processing_stage, context);
|
||||
return { std::make_shared<NullBlockInputStream>(getSampleBlockForColumns(column_names)) };
|
||||
}
|
||||
|
||||
|
@ -80,7 +80,6 @@ BlockInputStreams StorageODBC::read(const Names & column_names,
|
||||
unsigned num_streams)
|
||||
{
|
||||
check(column_names);
|
||||
checkQueryProcessingStage(processed_stage, context);
|
||||
|
||||
odbc_bridge_helper.startODBCBridgeSync();
|
||||
return IStorageURLBase::read(column_names, query_info, context, processed_stage, max_block_size, num_streams);
|
||||
|
@ -2929,11 +2929,10 @@ BlockInputStreams StorageReplicatedMergeTree::read(
|
||||
const Names & column_names,
|
||||
const SelectQueryInfo & query_info,
|
||||
const Context & context,
|
||||
QueryProcessingStage::Enum processed_stage,
|
||||
QueryProcessingStage::Enum /*processed_stage*/,
|
||||
const size_t max_block_size,
|
||||
const unsigned num_streams)
|
||||
{
|
||||
checkQueryProcessingStage(processed_stage, context);
|
||||
const Settings & settings = context.getSettingsRef();
|
||||
|
||||
/** The `select_sequential_consistency` setting has two meanings:
|
||||
|
@ -235,11 +235,10 @@ BlockInputStreams StorageStripeLog::read(
|
||||
const Names & column_names,
|
||||
const SelectQueryInfo & /*query_info*/,
|
||||
const Context & context,
|
||||
QueryProcessingStage::Enum processed_stage,
|
||||
QueryProcessingStage::Enum /*processed_stage*/,
|
||||
const size_t /*max_block_size*/,
|
||||
unsigned num_streams)
|
||||
{
|
||||
checkQueryProcessingStage(processed_stage, context);
|
||||
std::shared_lock<std::shared_mutex> lock(rwlock);
|
||||
|
||||
check(column_names);
|
||||
|
@ -384,11 +384,10 @@ BlockInputStreams StorageTinyLog::read(
|
||||
const Names & column_names,
|
||||
const SelectQueryInfo & /*query_info*/,
|
||||
const Context & context,
|
||||
QueryProcessingStage::Enum processed_stage,
|
||||
QueryProcessingStage::Enum /*processed_stage*/,
|
||||
const size_t max_block_size,
|
||||
const unsigned /*num_streams*/)
|
||||
{
|
||||
checkQueryProcessingStage(processed_stage, context);
|
||||
check(column_names);
|
||||
return BlockInputStreams(1, std::make_shared<TinyLogBlockInputStream>(
|
||||
max_block_size, Nested::collect(getColumns().getAllPhysical().addTypes(column_names)), *this, context.getSettingsRef().max_read_buffer_size));
|
||||
|
@ -160,8 +160,6 @@ BlockInputStreams IStorageURLBase::read(const Names & column_names,
|
||||
size_t max_block_size,
|
||||
unsigned /*num_streams*/)
|
||||
{
|
||||
checkQueryProcessingStage(processed_stage, context);
|
||||
|
||||
auto request_uri = uri;
|
||||
auto params = getReadURIParams(column_names, query_info, context, processed_stage, max_block_size);
|
||||
for (const auto & [param, value] : params)
|
||||
|
@ -41,12 +41,10 @@ BlockInputStreams StorageView::read(
|
||||
const Names & column_names,
|
||||
const SelectQueryInfo & query_info,
|
||||
const Context & context,
|
||||
QueryProcessingStage::Enum processed_stage,
|
||||
QueryProcessingStage::Enum /*processed_stage*/,
|
||||
const size_t /*max_block_size*/,
|
||||
const unsigned /*num_streams*/)
|
||||
{
|
||||
checkQueryProcessingStage(processed_stage, context);
|
||||
|
||||
BlockInputStreams res;
|
||||
|
||||
ASTPtr & current_inner_query = inner_query;
|
||||
|
@ -33,12 +33,11 @@ public:
|
||||
BlockInputStreams read(const Names & column_names,
|
||||
const SelectQueryInfo & query_info,
|
||||
const Context & context,
|
||||
QueryProcessingStage::Enum processed_stage,
|
||||
QueryProcessingStage::Enum /*processed_stage*/,
|
||||
size_t /*max_block_size*/,
|
||||
unsigned /*num_streams*/) override
|
||||
{
|
||||
check(column_names);
|
||||
checkQueryProcessingStage(processed_stage, context);
|
||||
|
||||
Block sample_block = getSampleBlock();
|
||||
MutableColumns res_columns = sample_block.cloneEmptyColumns();
|
||||
|
@ -198,11 +198,10 @@ BlockInputStreams StorageSystemColumns::read(
|
||||
const Names & column_names,
|
||||
const SelectQueryInfo & query_info,
|
||||
const Context & context,
|
||||
QueryProcessingStage::Enum processed_stage,
|
||||
QueryProcessingStage::Enum /*processed_stage*/,
|
||||
const size_t max_block_size,
|
||||
const unsigned /*num_streams*/)
|
||||
{
|
||||
checkQueryProcessingStage(processed_stage, context);
|
||||
check(column_names);
|
||||
|
||||
/// Create a mask of what columns are needed in the result.
|
||||
|
@ -53,13 +53,12 @@ StorageSystemNumbers::StorageSystemNumbers(const std::string & name_, bool multi
|
||||
BlockInputStreams StorageSystemNumbers::read(
|
||||
const Names & column_names,
|
||||
const SelectQueryInfo &,
|
||||
const Context & context,
|
||||
QueryProcessingStage::Enum processed_stage,
|
||||
const Context & /*context*/,
|
||||
QueryProcessingStage::Enum /*processed_stage*/,
|
||||
size_t max_block_size,
|
||||
unsigned num_streams)
|
||||
{
|
||||
check(column_names);
|
||||
checkQueryProcessingStage(processed_stage, context);
|
||||
|
||||
if (limit && limit < max_block_size)
|
||||
{
|
||||
|
@ -20,13 +20,12 @@ StorageSystemOne::StorageSystemOne(const std::string & name_)
|
||||
BlockInputStreams StorageSystemOne::read(
|
||||
const Names & column_names,
|
||||
const SelectQueryInfo &,
|
||||
const Context & context,
|
||||
QueryProcessingStage::Enum processed_stage,
|
||||
const Context & /*context*/,
|
||||
QueryProcessingStage::Enum /*processed_stage*/,
|
||||
const size_t /*max_block_size*/,
|
||||
const unsigned /*num_streams*/)
|
||||
{
|
||||
check(column_names);
|
||||
checkQueryProcessingStage(processed_stage, context);
|
||||
|
||||
return BlockInputStreams(1, std::make_shared<OneBlockInputStream>(
|
||||
Block{ColumnWithTypeAndName(
|
||||
|
@ -237,12 +237,11 @@ BlockInputStreams StorageSystemPartsBase::read(
|
||||
const Names & column_names,
|
||||
const SelectQueryInfo & query_info,
|
||||
const Context & context,
|
||||
QueryProcessingStage::Enum processed_stage,
|
||||
QueryProcessingStage::Enum /*processed_stage*/,
|
||||
const size_t /*max_block_size*/,
|
||||
const unsigned /*num_streams*/)
|
||||
{
|
||||
bool has_state_column = hasStateColumn(column_names);
|
||||
checkQueryProcessingStage(processed_stage, context);
|
||||
|
||||
StoragesInfoStream stream(query_info, context, has_state_column);
|
||||
|
||||
|
@ -55,12 +55,11 @@ BlockInputStreams StorageSystemReplicas::read(
|
||||
const Names & column_names,
|
||||
const SelectQueryInfo & query_info,
|
||||
const Context & context,
|
||||
QueryProcessingStage::Enum processed_stage,
|
||||
QueryProcessingStage::Enum /*processed_stage*/,
|
||||
const size_t /*max_block_size*/,
|
||||
const unsigned /*num_streams*/)
|
||||
{
|
||||
check(column_names);
|
||||
checkQueryProcessingStage(processed_stage, context);
|
||||
|
||||
/// We collect a set of replicated tables.
|
||||
std::map<String, std::map<String, StoragePtr>> replicated_tables;
|
||||
|
@ -258,11 +258,10 @@ BlockInputStreams StorageSystemTables::read(
|
||||
const Names & column_names,
|
||||
const SelectQueryInfo & query_info,
|
||||
const Context & context,
|
||||
QueryProcessingStage::Enum processed_stage,
|
||||
QueryProcessingStage::Enum /*processed_stage*/,
|
||||
const size_t max_block_size,
|
||||
const unsigned /*num_streams*/)
|
||||
{
|
||||
checkQueryProcessingStage(processed_stage, context);
|
||||
check(column_names);
|
||||
|
||||
/// Create a mask of what columns are needed in the result.
|
||||
|
@ -0,0 +1,2 @@
|
||||
300
|
||||
300
|
@ -0,0 +1,20 @@
|
||||
DROP TABLE IF EXISTS test.test_local_1;
|
||||
DROP TABLE IF EXISTS test.test_local_2;
|
||||
DROP TABLE IF EXISTS test.test_distributed_1;
|
||||
DROP TABLE IF EXISTS test.test_distributed_2;
|
||||
|
||||
CREATE TABLE test.test_local_1 (date Date, value UInt32) ENGINE = MergeTree(date, date, 8192);
|
||||
CREATE TABLE test.test_local_2 (date Date, value UInt32) ENGINE = MergeTree(date, date, 8192);
|
||||
CREATE TABLE test.test_distributed_1 AS test.test_local_1 ENGINE = Distributed('test_shard_localhost', 'test', test_local_1, rand());
|
||||
CREATE TABLE test.test_distributed_2 AS test.test_local_2 ENGINE = Distributed('test_shard_localhost', 'test', test_local_2, rand());
|
||||
|
||||
INSERT INTO test.test_local_1 VALUES ('2018-08-01',100);
|
||||
INSERT INTO test.test_local_2 VALUES ('2018-08-01',200);
|
||||
|
||||
SELECT sum(value) FROM merge('test', 'test_local_1|test_distributed_2');
|
||||
SELECT sum(value) FROM merge('test', 'test_distributed_1|test_distributed_2');
|
||||
|
||||
DROP TABLE IF EXISTS test.test_local_1;
|
||||
DROP TABLE IF EXISTS test.test_local_2;
|
||||
DROP TABLE IF EXISTS test.test_distributed_1;
|
||||
DROP TABLE IF EXISTS test.test_distributed_2;
|
Loading…
Reference in New Issue
Block a user