mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-12-18 12:22:12 +00:00
Merge pull request #9387 from ClickHouse/fix_another_context_related_segfault
Fix Context-related segfault in StorageMerge and destruction order in BlockIO
This commit is contained in:
commit
b3dfdbc1a0
56
dbms/src/DataStreams/BlockIO.cpp
Normal file
56
dbms/src/DataStreams/BlockIO.cpp
Normal file
@ -0,0 +1,56 @@
|
|||||||
|
#include <DataStreams/BlockIO.h>
|
||||||
|
#include <Interpreters/ProcessList.h>
|
||||||
|
|
||||||
|
namespace DB
|
||||||
|
{
|
||||||
|
|
||||||
|
void BlockIO::reset()
|
||||||
|
{
|
||||||
|
/** process_list_entry should be destroyed after in, after out and after pipeline,
|
||||||
|
* since in, out and pipeline contain pointer to objects inside process_list_entry (query-level MemoryTracker for example),
|
||||||
|
* which could be used before destroying of in and out.
|
||||||
|
*
|
||||||
|
* However, QueryStatus inside process_list_entry holds shared pointers to streams for some reason.
|
||||||
|
* Streams must be destroyed before storage locks, storages and contexts inside pipeline,
|
||||||
|
* so releaseQueryStreams() is required.
|
||||||
|
*/
|
||||||
|
/// TODO simplify it all
|
||||||
|
|
||||||
|
out.reset();
|
||||||
|
in.reset();
|
||||||
|
if (process_list_entry)
|
||||||
|
process_list_entry->get().releaseQueryStreams();
|
||||||
|
pipeline = QueryPipeline();
|
||||||
|
process_list_entry.reset();
|
||||||
|
|
||||||
|
/// TODO Do we need also reset callbacks? In which order?
|
||||||
|
}
|
||||||
|
|
||||||
|
BlockIO & BlockIO::operator= (BlockIO && rhs)
|
||||||
|
{
|
||||||
|
if (this == &rhs)
|
||||||
|
return *this;
|
||||||
|
|
||||||
|
/// Explicitly reset fields, so everything is destructed in right order
|
||||||
|
reset();
|
||||||
|
|
||||||
|
process_list_entry = std::move(rhs.process_list_entry);
|
||||||
|
in = std::move(rhs.in);
|
||||||
|
out = std::move(rhs.out);
|
||||||
|
pipeline = std::move(rhs.pipeline);
|
||||||
|
|
||||||
|
finish_callback = std::move(rhs.finish_callback);
|
||||||
|
exception_callback = std::move(rhs.exception_callback);
|
||||||
|
|
||||||
|
null_format = std::move(rhs.null_format);
|
||||||
|
|
||||||
|
return *this;
|
||||||
|
}
|
||||||
|
|
||||||
|
BlockIO::~BlockIO()
|
||||||
|
{
|
||||||
|
reset();
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
@ -15,13 +15,14 @@ class ProcessListEntry;
|
|||||||
struct BlockIO
|
struct BlockIO
|
||||||
{
|
{
|
||||||
BlockIO() = default;
|
BlockIO() = default;
|
||||||
BlockIO(const BlockIO &) = default;
|
BlockIO(BlockIO &&) = default;
|
||||||
~BlockIO() = default;
|
|
||||||
|
BlockIO & operator= (BlockIO && rhs);
|
||||||
|
~BlockIO();
|
||||||
|
|
||||||
|
BlockIO(const BlockIO &) = delete;
|
||||||
|
BlockIO & operator= (const BlockIO & rhs) = delete;
|
||||||
|
|
||||||
/** process_list_entry should be destroyed after in, after out and after pipeline,
|
|
||||||
* since in, out and pipeline contain pointer to objects inside process_list_entry (query-level MemoryTracker for example),
|
|
||||||
* which could be used before destroying of in and out.
|
|
||||||
*/
|
|
||||||
std::shared_ptr<ProcessListEntry> process_list_entry;
|
std::shared_ptr<ProcessListEntry> process_list_entry;
|
||||||
|
|
||||||
BlockOutputStreamPtr out;
|
BlockOutputStreamPtr out;
|
||||||
@ -49,28 +50,8 @@ struct BlockIO
|
|||||||
exception_callback();
|
exception_callback();
|
||||||
}
|
}
|
||||||
|
|
||||||
BlockIO & operator= (const BlockIO & rhs)
|
private:
|
||||||
{
|
void reset();
|
||||||
if (this == &rhs)
|
|
||||||
return *this;
|
|
||||||
|
|
||||||
out.reset();
|
|
||||||
in.reset();
|
|
||||||
pipeline = QueryPipeline();
|
|
||||||
process_list_entry.reset();
|
|
||||||
|
|
||||||
process_list_entry = rhs.process_list_entry;
|
|
||||||
in = rhs.in;
|
|
||||||
out = rhs.out;
|
|
||||||
pipeline = rhs.pipeline;
|
|
||||||
|
|
||||||
finish_callback = rhs.finish_callback;
|
|
||||||
exception_callback = rhs.exception_callback;
|
|
||||||
|
|
||||||
null_format = rhs.null_format;
|
|
||||||
|
|
||||||
return *this;
|
|
||||||
}
|
|
||||||
};
|
};
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -555,7 +555,7 @@ static std::tuple<ASTPtr, BlockIO> executeQueryImpl(
|
|||||||
throw;
|
throw;
|
||||||
}
|
}
|
||||||
|
|
||||||
return std::make_tuple(ast, res);
|
return std::make_tuple(ast, std::move(res));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -675,4 +675,31 @@ PipelineExecutorPtr QueryPipeline::execute()
|
|||||||
return std::make_shared<PipelineExecutor>(processors, process_list_element);
|
return std::make_shared<PipelineExecutor>(processors, process_list_element);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
QueryPipeline & QueryPipeline::operator= (QueryPipeline && rhs)
|
||||||
|
{
|
||||||
|
/// Reset primitive fields
|
||||||
|
process_list_element = rhs.process_list_element;
|
||||||
|
rhs.process_list_element = nullptr;
|
||||||
|
max_threads = rhs.max_threads;
|
||||||
|
rhs.max_threads = 0;
|
||||||
|
output_format = rhs.output_format;
|
||||||
|
rhs.output_format = nullptr;
|
||||||
|
has_resize = rhs.has_resize;
|
||||||
|
rhs.has_resize = false;
|
||||||
|
extremes_port = rhs.extremes_port;
|
||||||
|
rhs.extremes_port = nullptr;
|
||||||
|
totals_having_port = rhs.totals_having_port;
|
||||||
|
rhs.totals_having_port = nullptr;
|
||||||
|
|
||||||
|
/// Move these fields in destruction order (it's important)
|
||||||
|
streams = std::move(rhs.streams);
|
||||||
|
processors = std::move(rhs.processors);
|
||||||
|
current_header = std::move(rhs.current_header);
|
||||||
|
table_locks = std::move(rhs.table_locks);
|
||||||
|
storage_holders = std::move(rhs.storage_holders);
|
||||||
|
interpreter_context = std::move(rhs.interpreter_context);
|
||||||
|
|
||||||
|
return *this;
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -23,6 +23,12 @@ class QueryPipeline
|
|||||||
{
|
{
|
||||||
public:
|
public:
|
||||||
QueryPipeline() = default;
|
QueryPipeline() = default;
|
||||||
|
QueryPipeline(QueryPipeline &&) = default;
|
||||||
|
~QueryPipeline() = default;
|
||||||
|
QueryPipeline(const QueryPipeline &) = delete;
|
||||||
|
QueryPipeline & operator= (const QueryPipeline & rhs) = delete;
|
||||||
|
|
||||||
|
QueryPipeline & operator= (QueryPipeline && rhs);
|
||||||
|
|
||||||
/// All pipes must have same header.
|
/// All pipes must have same header.
|
||||||
void init(Pipes pipes);
|
void init(Pipes pipes);
|
||||||
@ -97,6 +103,17 @@ public:
|
|||||||
Pipe getPipe() &&;
|
Pipe getPipe() &&;
|
||||||
|
|
||||||
private:
|
private:
|
||||||
|
/// Destruction order: processors, header, locks, temporary storages, local contexts
|
||||||
|
|
||||||
|
/// Some Streams (or Processors) may implicitly use Context or temporary Storage created by Interpreter.
|
||||||
|
/// But lifetime of Streams is not nested in lifetime of Interpreters, so we have to store it here,
|
||||||
|
/// because QueryPipeline is alive until query is finished.
|
||||||
|
std::vector<std::shared_ptr<Context>> interpreter_context;
|
||||||
|
std::vector<StoragePtr> storage_holders;
|
||||||
|
TableStructureReadLocks table_locks;
|
||||||
|
|
||||||
|
/// Common header for each stream.
|
||||||
|
Block current_header;
|
||||||
|
|
||||||
/// All added processors.
|
/// All added processors.
|
||||||
Processors processors;
|
Processors processors;
|
||||||
@ -111,17 +128,6 @@ private:
|
|||||||
/// If resize processor was added to pipeline.
|
/// If resize processor was added to pipeline.
|
||||||
bool has_resize = false;
|
bool has_resize = false;
|
||||||
|
|
||||||
/// Common header for each stream.
|
|
||||||
Block current_header;
|
|
||||||
|
|
||||||
TableStructureReadLocks table_locks;
|
|
||||||
|
|
||||||
/// Some Streams (or Processors) may implicitly use Context or temporary Storage created by Interpreter.
|
|
||||||
/// But lifetime of Streams is not nested in lifetime of Interpreters, so we have to store it here,
|
|
||||||
/// because QueryPipeline is alive until query is finished.
|
|
||||||
std::vector<std::shared_ptr<Context>> interpreter_context;
|
|
||||||
std::vector<StoragePtr> storage_holders;
|
|
||||||
|
|
||||||
IOutputFormat * output_format = nullptr;
|
IOutputFormat * output_format = nullptr;
|
||||||
|
|
||||||
size_t max_threads = 0;
|
size_t max_threads = 0;
|
||||||
|
@ -185,8 +185,8 @@ Pipes StorageMerge::read(
|
|||||||
/** Just in case, turn off optimization "transfer to PREWHERE",
|
/** Just in case, turn off optimization "transfer to PREWHERE",
|
||||||
* since there is no certainty that it works when one of table is MergeTree and other is not.
|
* since there is no certainty that it works when one of table is MergeTree and other is not.
|
||||||
*/
|
*/
|
||||||
Context modified_context = context;
|
auto modified_context = std::make_shared<Context>(context);
|
||||||
modified_context.getSettingsRef().optimize_move_to_prewhere = false;
|
modified_context->getSettingsRef().optimize_move_to_prewhere = false;
|
||||||
|
|
||||||
/// What will be result structure depending on query processed stage in source tables?
|
/// What will be result structure depending on query processed stage in source tables?
|
||||||
Block header = getQueryHeader(column_names, query_info, context, processed_stage);
|
Block header = getQueryHeader(column_names, query_info, context, processed_stage);
|
||||||
@ -255,7 +255,7 @@ Pipes StorageMerge::read(
|
|||||||
Pipes StorageMerge::createSources(const SelectQueryInfo & query_info, const QueryProcessingStage::Enum & processed_stage,
|
Pipes StorageMerge::createSources(const SelectQueryInfo & query_info, const QueryProcessingStage::Enum & processed_stage,
|
||||||
const UInt64 max_block_size, const Block & header, const StorageWithLockAndName & storage_with_lock,
|
const UInt64 max_block_size, const Block & header, const StorageWithLockAndName & storage_with_lock,
|
||||||
Names & real_column_names,
|
Names & real_column_names,
|
||||||
Context & modified_context, size_t streams_num, bool has_table_virtual_column,
|
const std::shared_ptr<Context> & modified_context, size_t streams_num, bool has_table_virtual_column,
|
||||||
bool concat_streams)
|
bool concat_streams)
|
||||||
{
|
{
|
||||||
auto & [storage, struct_lock, table_name] = storage_with_lock;
|
auto & [storage, struct_lock, table_name] = storage_with_lock;
|
||||||
@ -272,38 +272,38 @@ Pipes StorageMerge::createSources(const SelectQueryInfo & query_info, const Quer
|
|||||||
{
|
{
|
||||||
/// This flag means that pipeline must be tree-shaped,
|
/// This flag means that pipeline must be tree-shaped,
|
||||||
/// so we can't enable processors for InterpreterSelectQuery here.
|
/// so we can't enable processors for InterpreterSelectQuery here.
|
||||||
auto stream = InterpreterSelectQuery(modified_query_info.query, modified_context, std::make_shared<OneBlockInputStream>(header),
|
auto stream = InterpreterSelectQuery(modified_query_info.query, *modified_context, std::make_shared<OneBlockInputStream>(header),
|
||||||
SelectQueryOptions(processed_stage).analyze()).execute().in;
|
SelectQueryOptions(processed_stage).analyze()).execute().in;
|
||||||
|
|
||||||
pipes.emplace_back(std::make_shared<SourceFromInputStream>(std::move(stream)));
|
pipes.emplace_back(std::make_shared<SourceFromInputStream>(std::move(stream)));
|
||||||
return pipes;
|
return pipes;
|
||||||
}
|
}
|
||||||
|
|
||||||
pipes.emplace_back(
|
auto pipe = InterpreterSelectQuery(modified_query_info.query, *modified_context,
|
||||||
InterpreterSelectQuery(modified_query_info.query, modified_context,
|
std::make_shared<OneBlockInputStream>(header),
|
||||||
std::make_shared<OneBlockInputStream>(header),
|
SelectQueryOptions(processed_stage).analyze()).executeWithProcessors().getPipe();
|
||||||
SelectQueryOptions(processed_stage).analyze()).executeWithProcessors().getPipe());
|
pipe.addInterpreterContext(modified_context);
|
||||||
|
pipes.emplace_back(std::move(pipe));
|
||||||
return pipes;
|
return pipes;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (processed_stage <= storage->getQueryProcessingStage(modified_context))
|
if (processed_stage <= storage->getQueryProcessingStage(*modified_context))
|
||||||
{
|
{
|
||||||
/// If there are only virtual columns in query, you must request at least one other column.
|
/// If there are only virtual columns in query, you must request at least one other column.
|
||||||
if (real_column_names.empty())
|
if (real_column_names.empty())
|
||||||
real_column_names.push_back(ExpressionActions::getSmallestColumn(storage->getColumns().getAllPhysical()));
|
real_column_names.push_back(ExpressionActions::getSmallestColumn(storage->getColumns().getAllPhysical()));
|
||||||
|
|
||||||
pipes = storage->read(real_column_names, modified_query_info, modified_context, processed_stage, max_block_size, UInt32(streams_num));
|
pipes = storage->read(real_column_names, modified_query_info, *modified_context, processed_stage, max_block_size, UInt32(streams_num));
|
||||||
}
|
}
|
||||||
else if (processed_stage > storage->getQueryProcessingStage(modified_context))
|
else if (processed_stage > storage->getQueryProcessingStage(*modified_context))
|
||||||
{
|
{
|
||||||
modified_query_info.query->as<ASTSelectQuery>()->replaceDatabaseAndTable(source_database, table_name);
|
modified_query_info.query->as<ASTSelectQuery>()->replaceDatabaseAndTable(source_database, table_name);
|
||||||
|
|
||||||
/// Maximum permissible parallelism is streams_num
|
/// Maximum permissible parallelism is streams_num
|
||||||
modified_context.getSettingsRef().max_threads = UInt64(streams_num);
|
modified_context->getSettingsRef().max_threads = UInt64(streams_num);
|
||||||
modified_context.getSettingsRef().max_streams_to_max_threads_ratio = 1;
|
modified_context->getSettingsRef().max_streams_to_max_threads_ratio = 1;
|
||||||
|
|
||||||
InterpreterSelectQuery interpreter{modified_query_info.query, modified_context, SelectQueryOptions(processed_stage)};
|
InterpreterSelectQuery interpreter{modified_query_info.query, *modified_context, SelectQueryOptions(processed_stage)};
|
||||||
|
|
||||||
if (query_info.force_tree_shaped_pipeline)
|
if (query_info.force_tree_shaped_pipeline)
|
||||||
{
|
{
|
||||||
@ -343,9 +343,11 @@ Pipes StorageMerge::createSources(const SelectQueryInfo & query_info, const Quer
|
|||||||
|
|
||||||
/// Subordinary tables could have different but convertible types, like numeric types of different width.
|
/// Subordinary tables could have different but convertible types, like numeric types of different width.
|
||||||
/// We must return streams with structure equals to structure of Merge table.
|
/// We must return streams with structure equals to structure of Merge table.
|
||||||
convertingSourceStream(header, modified_context, modified_query_info.query, pipe, processed_stage);
|
convertingSourceStream(header, *modified_context, modified_query_info.query, pipe, processed_stage);
|
||||||
|
|
||||||
pipe.addTableLock(struct_lock);
|
pipe.addTableLock(struct_lock);
|
||||||
|
pipe.addInterpreterContext(modified_context);
|
||||||
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -81,7 +81,7 @@ protected:
|
|||||||
const SelectQueryInfo & query_info, const QueryProcessingStage::Enum & processed_stage,
|
const SelectQueryInfo & query_info, const QueryProcessingStage::Enum & processed_stage,
|
||||||
const UInt64 max_block_size, const Block & header, const StorageWithLockAndName & storage_with_lock,
|
const UInt64 max_block_size, const Block & header, const StorageWithLockAndName & storage_with_lock,
|
||||||
Names & real_column_names,
|
Names & real_column_names,
|
||||||
Context & modified_context, size_t streams_num, bool has_table_virtual_column,
|
const std::shared_ptr<Context> & modified_context, size_t streams_num, bool has_table_virtual_column,
|
||||||
bool concat_streams = false);
|
bool concat_streams = false);
|
||||||
|
|
||||||
void convertingSourceStream(const Block & header, const Context & context, ASTPtr & query,
|
void convertingSourceStream(const Block & header, const Context & context, ASTPtr & query,
|
||||||
|
@ -3,10 +3,9 @@ CREATE TABLE test_01083.buffer (`n` Int8) ENGINE = Buffer(\'test_01083\', \'file
|
|||||||
CREATE TABLE test_01083.merge (`n` Int8) ENGINE = Merge(\'test_01083\', \'distributed\')
|
CREATE TABLE test_01083.merge (`n` Int8) ENGINE = Merge(\'test_01083\', \'distributed\')
|
||||||
CREATE TABLE test_01083.merge_tf AS merge(\'test_01083\', \'.*\')
|
CREATE TABLE test_01083.merge_tf AS merge(\'test_01083\', \'.*\')
|
||||||
CREATE TABLE test_01083.distributed (`n` Int8) ENGINE = Distributed(\'test_shard_localhost\', \'test_01083\', \'file\')
|
CREATE TABLE test_01083.distributed (`n` Int8) ENGINE = Distributed(\'test_shard_localhost\', \'test_01083\', \'file\')
|
||||||
CREATE TABLE test_01083.distributed_tf AS cluster(\'test_shard_localhost\', \'test_01083\', \'file\')
|
CREATE TABLE test_01083.distributed_tf AS cluster(\'test_shard_localhost\', \'test_01083\', \'buffer\')
|
||||||
CREATE TABLE test_01083.url (`n` UInt64, `_path` String) ENGINE = URL(\'https://localhost:8443/?query=select+n,+_path+from+test_01083.file+format+CSV\', \'CSV\')
|
CREATE TABLE test_01083.url (`n` UInt64, `col` String) ENGINE = URL(\'https://localhost:8443/?query=select+n,+_table+from+test_01083.merge+format+CSV\', \'CSV\')
|
||||||
CREATE TABLE test_01083.rich_syntax AS remote(\'localhos{x|y|t}\', cluster(\'test_shard_localhost\', remote(\'127.0.0.{1..4}\', \'test_01083\', \'url\')))
|
CREATE TABLE test_01083.rich_syntax AS remote(\'localhos{x|y|t}\', cluster(\'test_shard_localhost\', remote(\'127.0.0.{1..4}\', \'test_01083\', \'view\')))
|
||||||
1
|
CREATE VIEW test_01083.view (`n` Int64) AS SELECT toInt64(n) AS n FROM (SELECT toString(n) AS n FROM test_01083.merge WHERE _table != \'qwerty\' ORDER BY _table ASC) UNION ALL SELECT * FROM test_01083.file
|
||||||
1
|
CREATE DICTIONARY test_01083.dict (`n` UInt64, `col` String DEFAULT \'42\') PRIMARY KEY n SOURCE(CLICKHOUSE(HOST \'localhost\' PORT 9440 SECURE 1 USER \'default\' TABLE \'url\' DB \'test_01083\')) LIFETIME(MIN 0 MAX 1) LAYOUT(CACHE(SIZE_IN_CELLS 1))
|
||||||
1
|
16
|
||||||
1
|
|
||||||
|
@ -7,24 +7,32 @@ CREATE TABLE buffer (n Int8) ENGINE = Buffer(currentDatabase(), file, 16, 10, 20
|
|||||||
CREATE TABLE merge (n Int8) ENGINE = Merge('', lower('DISTRIBUTED'));
|
CREATE TABLE merge (n Int8) ENGINE = Merge('', lower('DISTRIBUTED'));
|
||||||
CREATE TABLE merge_tf as merge(currentDatabase(), '.*');
|
CREATE TABLE merge_tf as merge(currentDatabase(), '.*');
|
||||||
CREATE TABLE distributed (n Int8) ENGINE = Distributed(test_shard_localhost, currentDatabase(), 'fi' || 'le');
|
CREATE TABLE distributed (n Int8) ENGINE = Distributed(test_shard_localhost, currentDatabase(), 'fi' || 'le');
|
||||||
CREATE TABLE distributed_tf as cluster('test' || '_' || 'shard_localhost', '', 'fi' || 'le');
|
CREATE TABLE distributed_tf as cluster('test' || '_' || 'shard_localhost', '', 'buf' || 'fer');
|
||||||
|
|
||||||
INSERT INTO file VALUES (1);
|
INSERT INTO buffer VALUES (1);
|
||||||
CREATE TABLE url (n UInt64, _path String) ENGINE=URL
|
DETACH TABLE buffer; -- trigger flushing
|
||||||
|
ATTACH TABLE buffer;
|
||||||
|
|
||||||
|
CREATE TABLE url (n UInt64, col String) ENGINE=URL
|
||||||
(
|
(
|
||||||
replace
|
replace
|
||||||
(
|
(
|
||||||
'https://localhost:8443/?query=' || 'select n, _path from ' || currentDatabase() || '.file format CSV', ' ', '+' -- replace `file` with `merge` here after #9246 is fixed
|
'https://localhost:8443/?query=' || 'select n, _table from ' || currentDatabase() || '.merge format CSV', ' ', '+'
|
||||||
),
|
),
|
||||||
CSV
|
CSV
|
||||||
);
|
);
|
||||||
|
|
||||||
|
CREATE VIEW view AS SELECT toInt64(n) as n FROM (SELECT toString(n) as n from merge WHERE _table != 'qwerty' ORDER BY _table) UNION ALL SELECT * FROM file;
|
||||||
|
|
||||||
-- The following line is needed just to disable checking stderr for emptiness
|
-- The following line is needed just to disable checking stderr for emptiness
|
||||||
SELECT nonexistentsomething; -- { serverError 47 }
|
SELECT nonexistentsomething; -- { serverError 47 }
|
||||||
|
|
||||||
CREATE DICTIONARY dict (n UInt64, _path String DEFAULT '42') PRIMARY KEY n
|
CREATE DICTIONARY dict (n UInt64, col String DEFAULT '42') PRIMARY KEY n
|
||||||
SOURCE(CLICKHOUSE(HOST 'localhost' PORT 9440 SECURE 1 USER 'default' TABLE 'url' DB 'test_01083')) LIFETIME(1) LAYOUT(CACHE(SIZE_IN_CELLS 1));
|
SOURCE(CLICKHOUSE(HOST 'localhost' PORT 9440 SECURE 1 USER 'default' TABLE 'url' DB 'test_01083')) LIFETIME(1) LAYOUT(CACHE(SIZE_IN_CELLS 1));
|
||||||
|
|
||||||
|
-- dict --> url --> merge |-> distributed -> file (1)
|
||||||
|
-- |-> distributed_tf -> buffer -> file (1)
|
||||||
|
|
||||||
-- TODO make fuzz test from this
|
-- TODO make fuzz test from this
|
||||||
CREATE TABLE rich_syntax as remote
|
CREATE TABLE rich_syntax as remote
|
||||||
(
|
(
|
||||||
@ -37,15 +45,16 @@ CREATE TABLE rich_syntax as remote
|
|||||||
'127.0.0.{1..4}',
|
'127.0.0.{1..4}',
|
||||||
if
|
if
|
||||||
(
|
(
|
||||||
toString(40 + 2.0) NOT IN ('hello', dictGetString(currentDatabase() || '.dict', '_path', toUInt64('0001'))),
|
toString(40 + 2) NOT IN ('hello', dictGetString(currentDatabase() || '.dict', 'col', toUInt64('0001'))),
|
||||||
currentDatabase(),
|
currentDatabase(),
|
||||||
'FAIL'
|
'FAIL'
|
||||||
),
|
),
|
||||||
extract('123url456', '[a-z]+')
|
extract('123view456', '[a-z]+')
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
);
|
);
|
||||||
|
|
||||||
|
|
||||||
SHOW CREATE file;
|
SHOW CREATE file;
|
||||||
SHOW CREATE buffer;
|
SHOW CREATE buffer;
|
||||||
SHOW CREATE merge;
|
SHOW CREATE merge;
|
||||||
@ -54,7 +63,14 @@ SHOW CREATE distributed;
|
|||||||
SHOW CREATE distributed_tf;
|
SHOW CREATE distributed_tf;
|
||||||
SHOW CREATE url;
|
SHOW CREATE url;
|
||||||
SHOW CREATE rich_syntax;
|
SHOW CREATE rich_syntax;
|
||||||
|
SHOW CREATE view;
|
||||||
|
SHOW CREATE dict;
|
||||||
|
|
||||||
SELECT n from rich_syntax;
|
INSERT INTO buffer VALUES (1);
|
||||||
|
-- remote(localhost) --> cluster(test_shard_localhost) |-> remote(127.0.0.1) --> view |-> subquery --> merge |-> distributed --> file (1)
|
||||||
|
-- | | |-> distributed_tf -> buffer (1) -> file (1)
|
||||||
|
-- | |-> file (1)
|
||||||
|
-- |-> remote(127.0.0.2) --> ...
|
||||||
|
SELECT sum(n) from rich_syntax;
|
||||||
|
|
||||||
DROP DATABASE test_01083;
|
DROP DATABASE test_01083;
|
||||||
|
Loading…
Reference in New Issue
Block a user