mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-09-20 08:40:50 +00:00
Merge branch 'master' of github.com:yandex/ClickHouse
This commit is contained in:
commit
708515dd1e
@ -2,7 +2,9 @@
|
||||
#include <iostream>
|
||||
#include <limits>
|
||||
#include <regex>
|
||||
#include <sys/sysinfo.h>
|
||||
#if __has_include(<sys/sysinfo.h>)
|
||||
#include <sys/sysinfo.h>
|
||||
#endif
|
||||
#include <unistd.h>
|
||||
|
||||
#include <boost/program_options.hpp>
|
||||
@ -45,6 +47,7 @@ namespace ErrorCodes
|
||||
extern const int POCO_EXCEPTION;
|
||||
extern const int STD_EXCEPTION;
|
||||
extern const int UNKNOWN_EXCEPTION;
|
||||
extern const int NOT_IMPLEMENTED;
|
||||
}
|
||||
|
||||
static String pad(size_t padding) {
|
||||
@ -670,6 +673,7 @@ private:
|
||||
|
||||
if (precondition == "ram_size")
|
||||
{
|
||||
#if __has_include(<sys/sysinfo.h>)
|
||||
struct sysinfo *system_information = new struct sysinfo();
|
||||
if (sysinfo(system_information))
|
||||
{
|
||||
@ -687,6 +691,9 @@ private:
|
||||
return false;
|
||||
}
|
||||
}
|
||||
#else
|
||||
throw DB::Exception("Not implemented", ErrorCodes::NOT_IMPLEMENTED);
|
||||
#endif
|
||||
}
|
||||
|
||||
if (precondition == "table_exists")
|
||||
|
@ -10,6 +10,7 @@
|
||||
#include <ext/range.h>
|
||||
#include <common/logger_useful.h>
|
||||
#include <Core/Names.h>
|
||||
#include <memory>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
@ -338,7 +339,7 @@ ColumnPtr DictionaryBlockInputStream<DictionaryType, Key>::getColumnFromStringAt
|
||||
const Columns & keys, const DataTypes & data_types,
|
||||
const DictionaryAttribute& attribute, const DictionaryType& dictionary) const
|
||||
{
|
||||
auto column_string = std::make_unique<ColumnString>();
|
||||
auto column_string = std::make_shared<ColumnString>();
|
||||
auto ptr = column_string.get();
|
||||
callGetter(getter, ids, keys, data_types, ptr, attribute, dictionary);
|
||||
return column_string;
|
||||
|
@ -209,9 +209,6 @@ void InJoinSubqueriesPreprocessor::process(ASTSelectQuery * query) const
|
||||
|
||||
bool InJoinSubqueriesPreprocessor::hasAtLeastTwoShards(const IStorage & table) const
|
||||
{
|
||||
if (!table.isRemote())
|
||||
return false;
|
||||
|
||||
const StorageDistributed * distributed = typeid_cast<const StorageDistributed *>(&table);
|
||||
if (!distributed)
|
||||
return false;
|
||||
|
@ -838,24 +838,13 @@ QueryProcessingStage::Enum InterpreterSelectQuery::executeFetchColumns()
|
||||
if (max_streams > 1 && !is_remote)
|
||||
max_streams *= settings.max_streams_to_max_threads_ratio;
|
||||
|
||||
ASTPtr actual_query_ptr;
|
||||
if (storage->isRemote())
|
||||
{
|
||||
/// In case of a remote query, we send only SELECT, which will be executed.
|
||||
actual_query_ptr = query.cloneFirstSelect();
|
||||
}
|
||||
else
|
||||
actual_query_ptr = query_ptr;
|
||||
|
||||
/// PREWHERE optimization
|
||||
{
|
||||
auto optimize_prewhere = [&](auto & merge_tree)
|
||||
{
|
||||
const ASTSelectQuery & actual_select = typeid_cast<const ASTSelectQuery &>(*actual_query_ptr);
|
||||
|
||||
/// Try transferring some condition from WHERE to PREWHERE if enabled and viable
|
||||
if (settings.optimize_move_to_prewhere && actual_select.where_expression && !actual_select.prewhere_expression && !actual_select.final())
|
||||
MergeTreeWhereOptimizer{actual_query_ptr, context, merge_tree.getData(), required_columns, log};
|
||||
if (settings.optimize_move_to_prewhere && query.where_expression && !query.prewhere_expression && !query.final())
|
||||
MergeTreeWhereOptimizer{query_ptr, context, merge_tree.getData(), required_columns, log};
|
||||
};
|
||||
|
||||
if (const StorageMergeTree * merge_tree = typeid_cast<const StorageMergeTree *>(storage.get()))
|
||||
@ -864,8 +853,7 @@ QueryProcessingStage::Enum InterpreterSelectQuery::executeFetchColumns()
|
||||
optimize_prewhere(*merge_tree);
|
||||
}
|
||||
|
||||
streams = storage->read(required_columns, actual_query_ptr,
|
||||
context, from_stage, max_block_size, max_streams);
|
||||
streams = storage->read(required_columns, query_ptr, context, from_stage, max_block_size, max_streams);
|
||||
|
||||
if (alias_actions)
|
||||
{
|
||||
@ -1316,11 +1304,6 @@ void InterpreterSelectQuery::executeLimit()
|
||||
|
||||
void InterpreterSelectQuery::executeSubqueriesInSetsAndJoins(SubqueriesForSets & subqueries_for_sets)
|
||||
{
|
||||
/// If the query is not distributed, then remove the creation of temporary tables from subqueries (intended for sending to remote servers).
|
||||
if (!(storage && storage->isRemote()))
|
||||
for (auto & elem : subqueries_for_sets)
|
||||
elem.second.table.reset();
|
||||
|
||||
const Settings & settings = context.getSettingsRef();
|
||||
|
||||
executeUnion();
|
||||
|
@ -173,7 +173,7 @@ ASTPtr ASTSelectQuery::clone() const
|
||||
return ptr;
|
||||
}
|
||||
|
||||
ASTPtr ASTSelectQuery::cloneFirstSelect() const
|
||||
std::shared_ptr<ASTSelectQuery> ASTSelectQuery::cloneFirstSelect() const
|
||||
{
|
||||
auto res = cloneImpl(false);
|
||||
res->prev_union_all = nullptr;
|
||||
|
@ -39,7 +39,7 @@ public:
|
||||
ASTPtr clone() const override;
|
||||
|
||||
/// Get a deep copy of the first SELECT query tree.
|
||||
ASTPtr cloneFirstSelect() const;
|
||||
std::shared_ptr<ASTSelectQuery> cloneFirstSelect() const;
|
||||
|
||||
private:
|
||||
std::shared_ptr<ASTSelectQuery> cloneImpl(bool traverse_union_all) const;
|
||||
|
@ -60,8 +60,8 @@ namespace
|
||||
/// Creates a copy of query, changes database and table names.
|
||||
ASTPtr rewriteSelectQuery(const ASTPtr & query, const std::string & database, const std::string & table)
|
||||
{
|
||||
auto modified_query_ast = query->clone();
|
||||
typeid_cast<ASTSelectQuery &>(*modified_query_ast).replaceDatabaseAndTable(database, table);
|
||||
auto modified_query_ast = typeid_cast<const ASTSelectQuery &>(*query).cloneFirstSelect();
|
||||
modified_query_ast->replaceDatabaseAndTable(database, table);
|
||||
return modified_query_ast;
|
||||
}
|
||||
|
||||
|
@ -50,6 +50,19 @@ StorageMerge::StorageMerge(
|
||||
{
|
||||
}
|
||||
|
||||
bool StorageMerge::isRemote() const
|
||||
{
|
||||
auto database = context.getDatabase(source_database);
|
||||
auto iterator = database->getIterator();
|
||||
|
||||
while (iterator->isValid())
|
||||
{
|
||||
if (table_name_regexp.match(iterator->name()) && iterator->table()->isRemote())
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
NameAndTypePair StorageMerge::getColumn(const String & column_name) const
|
||||
{
|
||||
auto type = VirtualColumnFactory::tryGetType(column_name);
|
||||
|
@ -20,6 +20,8 @@ public:
|
||||
std::string getName() const override { return "Merge"; }
|
||||
std::string getTableName() const override { return name; }
|
||||
|
||||
bool isRemote() const override;
|
||||
|
||||
/// The check is delayed to the read method. It checks the support of the tables used.
|
||||
bool supportsSampling() const override { return true; }
|
||||
bool supportsPrewhere() const override { return true; }
|
||||
|
@ -113,7 +113,6 @@ class ClickHouseCluster:
|
||||
|
||||
for instance in self.instances.values():
|
||||
instance.docker_client = None
|
||||
instance.docker_id = None
|
||||
instance.ip_address = None
|
||||
instance.client = None
|
||||
|
||||
|
@ -0,0 +1,18 @@
|
||||
<yandex>
|
||||
<remote_servers>
|
||||
<test_cluster>
|
||||
<shard>
|
||||
<replica>
|
||||
<host>node1</host>
|
||||
<port>9000</port>
|
||||
</replica>
|
||||
</shard>
|
||||
<shard>
|
||||
<replica>
|
||||
<host>node2</host>
|
||||
<port>9000</port>
|
||||
</replica>
|
||||
</shard>
|
||||
</test_cluster>
|
||||
</remote_servers>
|
||||
</yandex>
|
@ -0,0 +1,48 @@
|
||||
from contextlib import contextmanager
|
||||
|
||||
import pytest
|
||||
|
||||
from helpers.cluster import ClickHouseCluster
|
||||
|
||||
cluster = ClickHouseCluster(__file__)
|
||||
|
||||
node1 = cluster.add_instance('node1', main_configs=['configs/remote_servers.xml'])
|
||||
node2 = cluster.add_instance('node2', main_configs=['configs/remote_servers.xml'])
|
||||
|
||||
@pytest.fixture(scope="module")
|
||||
def started_cluster():
|
||||
try:
|
||||
cluster.start()
|
||||
|
||||
for node in (node1, node2):
|
||||
node.query('''
|
||||
CREATE TABLE local_table(id UInt32, val String) ENGINE = TinyLog;
|
||||
''')
|
||||
|
||||
node1.query("INSERT INTO local_table VALUES (1, 'node1')")
|
||||
node2.query("INSERT INTO local_table VALUES (2, 'node2')")
|
||||
|
||||
node1.query('''
|
||||
CREATE TABLE distributed_table(id UInt32, val String) ENGINE = Distributed(test_cluster, default, local_table);
|
||||
CREATE TABLE merge_table(id UInt32, val String) ENGINE = Merge(default, '^distributed_table')
|
||||
''')
|
||||
|
||||
yield cluster
|
||||
|
||||
finally:
|
||||
cluster.shutdown()
|
||||
|
||||
|
||||
def test_global_in(started_cluster):
|
||||
assert node1.query("SELECT val FROM distributed_table WHERE id GLOBAL IN (SELECT toUInt32(3 - id) FROM local_table)").rstrip() \
|
||||
== 'node2'
|
||||
|
||||
assert node1.query("SELECT val FROM merge_table WHERE id GLOBAL IN (SELECT toUInt32(3 - id) FROM local_table)").rstrip() \
|
||||
== 'node2'
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
with contextmanager(started_cluster)() as cluster:
|
||||
for name, instance in cluster.instances.items():
|
||||
print name, instance.ip_address
|
||||
raw_input("Cluster created, press any key to destroy...")
|
Loading…
Reference in New Issue
Block a user