Merge pull request #6 from nikvas0/nikvas0/index_replication

Nikvas0/index replication
This commit is contained in:
Nikita Vasilev 2019-01-10 20:51:31 +03:00 committed by GitHub
commit 4d4af73480
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
78 changed files with 742 additions and 553 deletions

2
contrib/jemalloc vendored

@ -1 +1 @@
Subproject commit cd2931ad9bbd78208565716ab102e86d858c2fff
Subproject commit 41b7372eadee941b9164751b8d4963f915d3ceae

View File

@ -16,7 +16,7 @@ set (CONFIG_VERSION ${CMAKE_CURRENT_BINARY_DIR}/src/Common/config_version.h)
set (CONFIG_COMMON ${CMAKE_CURRENT_BINARY_DIR}/src/Common/config.h)
include (cmake/version.cmake)
message (STATUS "Will build ${VERSION_FULL}")
message (STATUS "Will build ${VERSION_FULL} revision ${VERSION_REVISION}")
configure_file (src/Common/config.h.in ${CONFIG_COMMON})
configure_file (src/Common/config_version.h.in ${CONFIG_VERSION})

View File

@ -1,11 +1,11 @@
# This strings autochanged from release_lib.sh:
set(VERSION_REVISION 54413 CACHE STRING "") # changed manually for tests
set(VERSION_MAJOR 19 CACHE STRING "")
set(VERSION_MINOR 1 CACHE STRING "")
set(VERSION_PATCH 0 CACHE STRING "")
set(VERSION_GITHASH 014e344a36bc19a58621e0add379984cf62b9067 CACHE STRING "")
set(VERSION_DESCRIBE v19.1.0-testing CACHE STRING "")
set(VERSION_STRING 19.1.0 CACHE STRING "")
set(VERSION_REVISION 54413)
set(VERSION_MAJOR 19)
set(VERSION_MINOR 1)
set(VERSION_PATCH 0)
set(VERSION_GITHASH 014e344a36bc19a58621e0add379984cf62b9067)
set(VERSION_DESCRIBE v19.1.0-testing)
set(VERSION_STRING 19.1.0)
# end of autochange
set(VERSION_EXTRA "" CACHE STRING "")
@ -19,8 +19,8 @@ if (VERSION_EXTRA)
string(CONCAT VERSION_STRING ${VERSION_STRING} "." ${VERSION_EXTRA})
endif ()
set (VERSION_NAME "${PROJECT_NAME}" CACHE STRING "")
set (VERSION_FULL "${VERSION_NAME} ${VERSION_STRING}" CACHE STRING "")
set (VERSION_SO "${VERSION_STRING}" CACHE STRING "")
set (VERSION_NAME "${PROJECT_NAME}")
set (VERSION_FULL "${VERSION_NAME} ${VERSION_STRING}")
set (VERSION_SO "${VERSION_STRING}")
math (EXPR VERSION_INTEGER "${VERSION_PATCH} + ${VERSION_MINOR}*1000 + ${VERSION_MAJOR}*1000000")

View File

@ -243,7 +243,7 @@ struct ClusterPartition
UInt64 rows_copied = 0;
UInt64 blocks_copied = 0;
size_t total_tries = 0;
UInt64 total_tries = 0;
};
@ -340,7 +340,7 @@ struct TaskCluster
String default_local_database;
/// Limits number of simultaneous workers
size_t max_workers = 0;
UInt64 max_workers = 0;
/// Base settings for pull and push
Settings settings_common;
@ -773,11 +773,11 @@ public:
}
template <typename T>
decltype(auto) retry(T && func, size_t max_tries = 100)
decltype(auto) retry(T && func, UInt64 max_tries = 100)
{
std::exception_ptr exception;
for (size_t try_number = 1; try_number <= max_tries; ++try_number)
for (UInt64 try_number = 1; try_number <= max_tries; ++try_number)
{
try
{
@ -880,7 +880,7 @@ public:
}
/// Compute set of partitions, assume set of partitions aren't changed during the processing
void discoverTablePartitions(TaskTable & task_table, size_t num_threads = 0)
void discoverTablePartitions(TaskTable & task_table, UInt64 num_threads = 0)
{
/// Fetch partitions list from a shard
{
@ -985,7 +985,7 @@ public:
/// Retry table processing
bool table_is_done = false;
for (size_t num_table_tries = 0; num_table_tries < max_table_tries; ++num_table_tries)
for (UInt64 num_table_tries = 0; num_table_tries < max_table_tries; ++num_table_tries)
{
if (tryProcessTable(task_table))
{
@ -1044,7 +1044,7 @@ protected:
String workers_path = getWorkersPath();
String current_worker_path = getCurrentWorkerNodePath();
size_t num_bad_version_errors = 0;
UInt64 num_bad_version_errors = 0;
while (true)
{
@ -1055,7 +1055,7 @@ protected:
auto version = stat.version;
zookeeper->get(workers_path, &stat);
if (static_cast<size_t>(stat.numChildren) >= task_cluster->max_workers)
if (static_cast<UInt64>(stat.numChildren) >= task_cluster->max_workers)
{
LOG_DEBUG(log, "Too many workers (" << stat.numChildren << ", maximum " << task_cluster->max_workers << ")"
<< ". Postpone processing " << description);
@ -1163,7 +1163,7 @@ protected:
}
// If all task is finished and zxid is not changed then partition could not become dirty again
for (size_t shard_num = 0; shard_num < status_paths.size(); ++shard_num)
for (UInt64 shard_num = 0; shard_num < status_paths.size(); ++shard_num)
{
if (zxid1[shard_num] != zxid2[shard_num])
{
@ -1280,7 +1280,7 @@ protected:
LOG_DEBUG(log, "Execute distributed DROP PARTITION: " << query);
/// Limit number of max executing replicas to 1
size_t num_shards = executeQueryOnCluster(cluster_push, query, nullptr, &settings_push, PoolMode::GET_ONE, 1);
UInt64 num_shards = executeQueryOnCluster(cluster_push, query, nullptr, &settings_push, PoolMode::GET_ONE, 1);
if (num_shards < cluster_push->getShardCount())
{
@ -1299,8 +1299,8 @@ protected:
}
static constexpr size_t max_table_tries = 1000;
static constexpr size_t max_shard_partition_tries = 600;
static constexpr UInt64 max_table_tries = 1000;
static constexpr UInt64 max_shard_partition_tries = 600;
bool tryProcessTable(TaskTable & task_table)
{
@ -1317,7 +1317,7 @@ protected:
Stopwatch watch;
TasksShard expected_shards;
size_t num_failed_shards = 0;
UInt64 num_failed_shards = 0;
++cluster_partition.total_tries;
@ -1368,7 +1368,7 @@ protected:
bool is_unprioritized_task = !previous_shard_is_instantly_finished && shard->priority.is_remote;
PartitionTaskStatus task_status = PartitionTaskStatus::Error;
bool was_error = false;
for (size_t try_num = 0; try_num < max_shard_partition_tries; ++try_num)
for (UInt64 try_num = 0; try_num < max_shard_partition_tries; ++try_num)
{
task_status = tryProcessPartitionTask(partition, is_unprioritized_task);
@ -1434,8 +1434,8 @@ protected:
}
}
size_t required_partitions = task_table.cluster_partitions.size();
size_t finished_partitions = task_table.finished_cluster_partitions.size();
UInt64 required_partitions = task_table.cluster_partitions.size();
UInt64 finished_partitions = task_table.finished_cluster_partitions.size();
bool table_is_done = finished_partitions >= required_partitions;
if (!table_is_done)
@ -1645,7 +1645,7 @@ protected:
String query = queryToString(create_query_push_ast);
LOG_DEBUG(log, "Create destination tables. Query: " << query);
size_t shards = executeQueryOnCluster(task_table.cluster_push, query, create_query_push_ast, &task_cluster->settings_push,
UInt64 shards = executeQueryOnCluster(task_table.cluster_push, query, create_query_push_ast, &task_cluster->settings_push,
PoolMode::GET_MANY);
LOG_DEBUG(log, "Destination tables " << getDatabaseDotTable(task_table.table_push) << " have been created on " << shards
<< " shards of " << task_table.cluster_push->getShardCount());
@ -1699,7 +1699,7 @@ protected:
std::future<Coordination::ExistsResponse> future_is_dirty_checker;
Stopwatch watch(CLOCK_MONOTONIC_COARSE);
constexpr size_t check_period_milliseconds = 500;
constexpr UInt64 check_period_milliseconds = 500;
/// Will asynchronously check that ZooKeeper connection and is_dirty flag appearing while copy data
auto cancel_check = [&] ()
@ -1917,16 +1917,16 @@ protected:
/** Executes simple query (without output streams, for example DDL queries) on each shard of the cluster
* Returns number of shards for which at least one replica executed query successfully
*/
size_t executeQueryOnCluster(
UInt64 executeQueryOnCluster(
const ClusterPtr & cluster,
const String & query,
const ASTPtr & query_ast_ = nullptr,
const Settings * settings = nullptr,
PoolMode pool_mode = PoolMode::GET_ALL,
size_t max_successful_executions_per_shard = 0) const
UInt64 max_successful_executions_per_shard = 0) const
{
auto num_shards = cluster->getShardsInfo().size();
std::vector<size_t> per_shard_num_successful_replicas(num_shards, 0);
std::vector<UInt64> per_shard_num_successful_replicas(num_shards, 0);
ASTPtr query_ast;
if (query_ast_ == nullptr)
@ -1939,10 +1939,10 @@ protected:
/// We need to execute query on one replica at least
auto do_for_shard = [&] (size_t shard_index)
auto do_for_shard = [&] (UInt64 shard_index)
{
const Cluster::ShardInfo & shard = cluster->getShardsInfo().at(shard_index);
size_t & num_successful_executions = per_shard_num_successful_replicas.at(shard_index);
UInt64 & num_successful_executions = per_shard_num_successful_replicas.at(shard_index);
num_successful_executions = 0;
auto increment_and_check_exit = [&] ()
@ -1951,12 +1951,12 @@ protected:
return max_successful_executions_per_shard && num_successful_executions >= max_successful_executions_per_shard;
};
size_t num_replicas = cluster->getShardsAddresses().at(shard_index).size();
size_t num_local_replicas = shard.getLocalNodeCount();
size_t num_remote_replicas = num_replicas - num_local_replicas;
UInt64 num_replicas = cluster->getShardsAddresses().at(shard_index).size();
UInt64 num_local_replicas = shard.getLocalNodeCount();
UInt64 num_remote_replicas = num_replicas - num_local_replicas;
/// In that case we don't have local replicas, but do it just in case
for (size_t i = 0; i < num_local_replicas; ++i)
for (UInt64 i = 0; i < num_local_replicas; ++i)
{
auto interpreter = InterpreterFactory::get(query_ast, context);
interpreter->execute();
@ -1997,16 +1997,16 @@ protected:
};
{
ThreadPool thread_pool(std::min(num_shards, getNumberOfPhysicalCPUCores()));
ThreadPool thread_pool(std::min(num_shards, UInt64(getNumberOfPhysicalCPUCores())));
for (size_t shard_index = 0; shard_index < num_shards; ++shard_index)
for (UInt64 shard_index = 0; shard_index < num_shards; ++shard_index)
thread_pool.schedule([=] { do_for_shard(shard_index); });
thread_pool.wait();
}
size_t successful_shards = 0;
for (size_t num_replicas : per_shard_num_successful_replicas)
UInt64 successful_shards = 0;
for (UInt64 num_replicas : per_shard_num_successful_replicas)
successful_shards += (num_replicas > 0);
return successful_shards;

View File

@ -123,7 +123,7 @@ UInt64 hash(Ts... xs)
UInt64 maskBits(UInt64 x, size_t num_bits)
{
return x & ((1 << num_bits) - 1);
return x & ((1ULL << num_bits) - 1);
}
@ -149,7 +149,7 @@ UInt64 feistelNetwork(UInt64 x, size_t num_bits, UInt64 seed, size_t num_rounds
UInt64 bits = maskBits(x, num_bits);
for (size_t i = 0; i < num_rounds; ++i)
bits = feistelRound(bits, num_bits, seed, i);
return (x & ~((1 << num_bits) - 1)) ^ bits;
return (x & ~((1ULL << num_bits) - 1)) ^ bits;
}

View File

@ -133,7 +133,7 @@ public:
}
bool valuesHaveFixedSize() const override { return getDictionary().valuesHaveFixedSize(); }
bool isFixedAndContiguous() const override { return getDictionary().isFixedAndContiguous(); }
bool isFixedAndContiguous() const override { return false; }
size_t sizeOfValueIfFixed() const override { return getDictionary().sizeOfValueIfFixed(); }
bool isNumeric() const override { return getDictionary().isNumeric(); }
bool lowCardinality() const override { return true; }

View File

@ -3,7 +3,6 @@
#include "CurrentThread.h"
#include <common/logger_useful.h>
#include <Common/ThreadStatus.h>
#include <Common/ObjectPool.h>
#include <Common/TaskStatsInfoGetter.h>
#include <Interpreters/ProcessList.h>
#include <Interpreters/Context.h>
@ -24,8 +23,6 @@ namespace ErrorCodes
extern const int LOGICAL_ERROR;
}
SimpleObjectPool<TaskStatsInfoGetter> task_stats_info_getter_pool;
// Smoker's implementation to avoid thread_local usage: error: undefined symbol: __cxa_thread_atexit
#if defined(ARCADIA_ROOT)
struct ThreadStatusPtrHolder : ThreadStatusPtr

View File

@ -36,7 +36,7 @@ namespace
if (0 != pipe2(fds_rw, O_CLOEXEC))
DB::throwFromErrno("Cannot create pipe", DB::ErrorCodes::CANNOT_PIPE);
#else
if (0 != pipe(fds))
if (0 != pipe(fds_rw))
DB::throwFromErrno("Cannot create pipe", DB::ErrorCodes::CANNOT_PIPE);
if (0 != fcntl(fds_rw[0], F_SETFD, FD_CLOEXEC))
DB::throwFromErrno("Cannot create pipe", DB::ErrorCodes::CANNOT_PIPE);

View File

@ -21,9 +21,6 @@ namespace ErrorCodes
}
extern SimpleObjectPool<TaskStatsInfoGetter> task_stats_info_getter_pool;
TasksStatsCounters TasksStatsCounters::current()
{
TasksStatsCounters res;
@ -74,7 +71,7 @@ void ThreadStatus::initPerformanceCounters()
if (TaskStatsInfoGetter::checkPermissions())
{
if (!taskstats_getter)
taskstats_getter = task_stats_info_getter_pool.getDefault();
taskstats_getter = std::make_unique<TaskStatsInfoGetter>();
*last_taskstats = TasksStatsCounters::current();
}

View File

@ -2,7 +2,6 @@
#include <Common/ProfileEvents.h>
#include <Common/MemoryTracker.h>
#include <Common/ObjectPool.h>
#include <IO/Progress.h>
@ -175,8 +174,7 @@ protected:
std::unique_ptr<TasksStatsCounters> last_taskstats;
/// Set to non-nullptr only if we have enough capabilities.
/// We use pool because creation and destruction of TaskStatsInfoGetter objects are expensive.
SimpleObjectPool<TaskStatsInfoGetter>::Pointer taskstats_getter;
std::unique_ptr<TaskStatsInfoGetter> taskstats_getter;
};
}

View File

@ -1039,8 +1039,8 @@ void ZooKeeper::sendThread()
{
/// Wait for the next request in queue. No more than operation timeout. No more than until next heartbeat time.
UInt64 max_wait = std::min(
std::chrono::duration_cast<std::chrono::milliseconds>(next_heartbeat_time - now).count(),
operation_timeout.totalMilliseconds());
UInt64(std::chrono::duration_cast<std::chrono::milliseconds>(next_heartbeat_time - now).count()),
UInt64(operation_timeout.totalMilliseconds()));
RequestInfo info;
if (requests_queue.tryPop(info, max_wait))

View File

@ -16,6 +16,7 @@
#cmakedefine01 USE_BASE64
#cmakedefine01 USE_HDFS
#cmakedefine01 USE_XXHASH
#cmakedefine01 USE_INTERNAL_LLVM_LIBRARY
#cmakedefine01 CLICKHOUSE_SPLIT_BINARY
#cmakedefine01 LLVM_HAS_RTTI

View File

@ -63,7 +63,7 @@ CompressionCodecZSTD::CompressionCodecZSTD(int level_)
void registerCodecZSTD(CompressionCodecFactory & factory)
{
UInt8 method_code = static_cast<char>(CompressionMethodByte::ZSTD);
UInt8 method_code = UInt8(CompressionMethodByte::ZSTD);
factory.registerCompressionCodec("ZSTD", method_code, [&](const ASTPtr & arguments) -> CompressionCodecPtr
{
int level = CompressionCodecZSTD::ZSTD_DEFAULT_LEVEL;

View File

@ -78,7 +78,7 @@ Block MergeSortingBlockInputStream::readImpl()
if (max_bytes_before_external_sort && sum_bytes_in_blocks > max_bytes_before_external_sort)
{
Poco::File(tmp_path).createDirectories();
temporary_files.emplace_back(new Poco::TemporaryFile(tmp_path));
temporary_files.emplace_back(std::make_unique<Poco::TemporaryFile>(tmp_path));
const std::string & path = temporary_files.back()->path();
WriteBufferFromFile file_buf(path);
CompressedWriteBuffer compressed_buf(file_buf);

View File

@ -508,6 +508,10 @@ void DataTypeLowCardinality::serializeBinaryBulkWithMultipleStreams(
size_t max_limit = column.size() - offset;
limit = limit ? std::min(limit, max_limit) : max_limit;
/// Do not write anything for empty column. (May happen while writing empty arrays.)
if (limit == 0)
return;
auto sub_column = low_cardinality_column.cutAndCompact(offset, limit);
ColumnPtr positions = sub_column->getIndexesPtr();
ColumnPtr keys = sub_column->getDictionary().getNestedColumn();

View File

@ -73,7 +73,7 @@ public:
{
size_t language_id = static_cast<size_t>(language);
if (region_id > names_refs[language_id].size())
if (region_id >= names_refs[language_id].size())
return StringRef("", 0);
StringRef ref = names_refs[language_id][region_id];

View File

@ -512,8 +512,8 @@ static std::optional<DataTypes> removeNullables(const DataTypes & types)
if (!typeid_cast<const DataTypeNullable *>(type.get()))
continue;
DataTypes filtered;
for (const auto & type : types)
filtered.emplace_back(removeNullable(type));
for (const auto & sub_type : types)
filtered.emplace_back(removeNullable(sub_type));
return filtered;
}
return {};

View File

@ -132,7 +132,7 @@ void FunctionHasColumnInTable::executeImpl(Block & block, const ColumnNumbers &
has_column = remote_columns.hasPhysical(column_name);
}
block.getByPosition(result).column = DataTypeUInt8().createColumnConst(input_rows_count, has_column);
block.getByPosition(result).column = DataTypeUInt8().createColumnConst(input_rows_count, Field(has_column));
}

View File

@ -1,6 +1,7 @@
#pragma once
#include <atomic>
#include <cstddef>
#include <common/Types.h>
#include <Core/Defines.h>

View File

@ -48,7 +48,7 @@ ExpressionActionsPtr AnalyzedJoin::createJoinedBlockActions(
source_column_names.emplace_back(column.name_and_type);
ASTPtr query = expression_list;
auto syntax_result = SyntaxAnalyzer(context, {}).analyze(query, source_column_names, required_columns);
auto syntax_result = SyntaxAnalyzer(context).analyze(query, source_column_names, required_columns);
ExpressionAnalyzer analyzer(query, syntax_result, context, {}, required_columns);
auto joined_block_actions = analyzer.getActions(false);

View File

@ -14,14 +14,8 @@ namespace DB
namespace ClusterProxy
{
BlockInputStreams executeQuery(
IStreamFactory & stream_factory, const ClusterPtr & cluster,
const ASTPtr & query_ast, const Context & context, const Settings & settings)
Context removeUserRestrictionsFromSettings(const Context & context, const Settings & settings)
{
BlockInputStreams res;
const std::string query = queryToString(query_ast);
Settings new_settings = settings;
new_settings.queue_max_wait_ms = Cluster::saturate(new_settings.queue_max_wait_ms, settings.max_execution_time);
@ -39,6 +33,19 @@ BlockInputStreams executeQuery(
Context new_context(context);
new_context.setSettings(new_settings);
return new_context;
}
BlockInputStreams executeQuery(
IStreamFactory & stream_factory, const ClusterPtr & cluster,
const ASTPtr & query_ast, const Context & context, const Settings & settings)
{
BlockInputStreams res;
const std::string query = queryToString(query_ast);
Context new_context = removeUserRestrictionsFromSettings(context, settings);
ThrottlerPtr user_level_throttler;
if (auto process_list_element = context.getProcessListElement())
user_level_throttler = process_list_element->getUserNetworkThrottler();

View File

@ -16,6 +16,10 @@ namespace ClusterProxy
class IStreamFactory;
/// removes different restrictions (like max_concurrent_queries_for_user, max_memory_usage_for_user, etc.)
/// from settings and creates new context with them
Context removeUserRestrictionsFromSettings(const Context & context, const Settings & settings);
/// Execute a distributed query, creating a vector of BlockInputStreams, from which the result can be read.
/// `stream_factory` object encapsulates the logic of creating streams for a different type of query
/// (currently SELECT, DESCRIBE).

View File

@ -37,7 +37,7 @@ static ASTPtr addTypeConversion(std::unique_ptr<ASTLiteral> && ast, const String
return res;
}
bool ExecuteScalarSubqueriesMatcher::needChildVisit(ASTPtr & node, const ASTPtr &)
bool ExecuteScalarSubqueriesMatcher::needChildVisit(ASTPtr & node, const ASTPtr & child)
{
/// Processed
if (typeid_cast<ASTSubquery *>(node.get()) ||
@ -48,6 +48,14 @@ bool ExecuteScalarSubqueriesMatcher::needChildVisit(ASTPtr & node, const ASTPtr
if (typeid_cast<ASTTableExpression *>(node.get()))
return false;
if (typeid_cast<ASTSelectQuery *>(node.get()))
{
/// Do not go to FROM, JOIN, UNION.
if (typeid_cast<ASTTableExpression *>(child.get()) ||
typeid_cast<ASTSelectQuery *>(child.get()))
return false;
}
return true;
}

View File

@ -161,21 +161,21 @@ auto wrapJITSymbolResolver(llvm::JITSymbolResolver & jsr)
// Actually this should work for 7.0.0 but now we have OLDER 7.0.0svn in contrib
auto flags = [&](const llvm::orc::SymbolNameSet & symbols)
{
llvm::orc::SymbolFlagsMap flags;
llvm::orc::SymbolFlagsMap flags_map;
for (const auto & symbol : symbols)
{
auto resolved = jsr.lookupFlags({*symbol});
if (resolved && resolved->size())
flags.emplace(symbol, resolved->begin()->second);
flags_map.emplace(symbol, resolved->begin()->second);
}
return flags;
return flags_map;
};
#endif
auto symbols = [&](std::shared_ptr<llvm::orc::AsynchronousSymbolQuery> query, llvm::orc::SymbolNameSet symbols)
auto symbols = [&](std::shared_ptr<llvm::orc::AsynchronousSymbolQuery> query, llvm::orc::SymbolNameSet symbols_set)
{
llvm::orc::SymbolNameSet missing;
for (const auto & symbol : symbols)
for (const auto & symbol : symbols_set)
{
auto resolved = jsr.lookup({*symbol});
if (resolved && resolved->size())
@ -275,20 +275,20 @@ struct LLVMContext
{
if (!module->size())
return 0;
llvm::PassManagerBuilder builder;
llvm::PassManagerBuilder pass_manager_builder;
llvm::legacy::PassManager mpm;
llvm::legacy::FunctionPassManager fpm(module.get());
builder.OptLevel = 3;
builder.SLPVectorize = true;
builder.LoopVectorize = true;
builder.RerollLoops = true;
builder.VerifyInput = true;
builder.VerifyOutput = true;
machine->adjustPassManager(builder);
pass_manager_builder.OptLevel = 3;
pass_manager_builder.SLPVectorize = true;
pass_manager_builder.LoopVectorize = true;
pass_manager_builder.RerollLoops = true;
pass_manager_builder.VerifyInput = true;
pass_manager_builder.VerifyOutput = true;
machine->adjustPassManager(pass_manager_builder);
fpm.add(llvm::createTargetTransformInfoWrapperPass(machine->getTargetIRAnalysis()));
mpm.add(llvm::createTargetTransformInfoWrapperPass(machine->getTargetIRAnalysis()));
builder.populateFunctionPassManager(fpm);
builder.populateModulePassManager(mpm);
pass_manager_builder.populateFunctionPassManager(fpm);
pass_manager_builder.populateModulePassManager(mpm);
fpm.doInitialization();
for (auto & function : *module)
fpm.run(function);

View File

@ -243,7 +243,7 @@ static ColumnsDeclarationAndModifiers parseColumns(const ASTExpressionList & col
/// set missing types and wrap default_expression's in a conversion-function if necessary
if (!defaulted_columns.empty())
{
auto syntax_analyzer_result = SyntaxAnalyzer(context, {}).analyze(default_expr_list, columns);
auto syntax_analyzer_result = SyntaxAnalyzer(context).analyze(default_expr_list, columns);
const auto actions = ExpressionAnalyzer(default_expr_list, syntax_analyzer_result, context).getActions(true);
const auto block = actions->getSampleBlock();

View File

@ -99,7 +99,6 @@ BlockIO InterpreterInsertQuery::execute()
out = std::make_shared<PushingToViewsBlockOutputStream>(query.database, query.table, table, context, query_ptr, query.no_destination);
/// Do not squash blocks if it is a sync INSERT into Distributed, since it lead to double bufferization on client and server side.
/// Client-side bufferization might cause excessive timeouts (especially in case of big blocks).
if (!(context.getSettingsRef().insert_distributed_sync && table->isRemote()))

View File

@ -184,8 +184,8 @@ InterpreterSelectQuery::InterpreterSelectQuery(
if (storage)
table_lock = storage->lockStructure(false);
syntax_analyzer_result = SyntaxAnalyzer(context, storage)
.analyze(query_ptr, source_header.getNamesAndTypesList(), required_result_column_names, subquery_depth);
syntax_analyzer_result = SyntaxAnalyzer(context, subquery_depth).analyze(
query_ptr, source_header.getNamesAndTypesList(), required_result_column_names, storage);
query_analyzer = std::make_unique<ExpressionAnalyzer>(
query_ptr, syntax_analyzer_result, context, NamesAndTypesList(), required_result_column_names, subquery_depth, !only_analyze);
@ -792,7 +792,7 @@ void InterpreterSelectQuery::executeFetchColumns(
}
auto additional_source_columns_set = ext::map<NameSet>(additional_source_columns, [] (const auto & it) { return it.name; });
auto syntax_result = SyntaxAnalyzer(context, storage).analyze(required_columns_expr_list, additional_source_columns);
auto syntax_result = SyntaxAnalyzer(context).analyze(required_columns_expr_list, additional_source_columns, {}, storage);
alias_actions = ExpressionAnalyzer(required_columns_expr_list, syntax_result, context).getActions(true);
/// The set of required columns could be added as a result of adding an action to calculate ALIAS.
@ -829,7 +829,7 @@ void InterpreterSelectQuery::executeFetchColumns(
}
prewhere_info->prewhere_actions = std::move(new_actions);
auto analyzed_result = SyntaxAnalyzer(context, {}).analyze(required_prewhere_columns_expr_list, storage->getColumns().getAllPhysical());
auto analyzed_result = SyntaxAnalyzer(context).analyze(required_prewhere_columns_expr_list, storage->getColumns().getAllPhysical());
prewhere_info->alias_actions =
ExpressionAnalyzer(required_prewhere_columns_expr_list, analyzed_result, context)
.getActions(true, false);

View File

@ -21,6 +21,7 @@
#include <Parsers/ASTDropQuery.h>
#include <Parsers/ASTCreateQuery.h>
#include <csignal>
#include <algorithm>
namespace DB
@ -289,7 +290,7 @@ void InterpreterSystemQuery::restartReplicas(Context & system_context)
if (replica_names.empty())
return;
ThreadPool pool(std::min(getNumberOfPhysicalCPUCores(), replica_names.size()));
ThreadPool pool(std::min(size_t(getNumberOfPhysicalCPUCores()), replica_names.size()));
for (auto & table : replica_names)
pool.schedule([&] () { tryRestartReplica(table.first, table.second, system_context); });
pool.wait();

View File

@ -194,7 +194,7 @@ void MutationsInterpreter::prepare(bool dry_run)
if (col_default.kind == ColumnDefaultKind::Materialized)
{
auto query = col_default.expression->clone();
auto syntax_result = SyntaxAnalyzer(context, {}).analyze(query, all_columns);
auto syntax_result = SyntaxAnalyzer(context).analyze(query, all_columns);
ExpressionAnalyzer analyzer(query, syntax_result, context);
for (const String & dependency : analyzer.getRequiredSourceColumns())
{
@ -203,10 +203,9 @@ void MutationsInterpreter::prepare(bool dry_run)
}
}
}
}
if (!updated_columns.empty())
validateUpdateColumns(storage, updated_columns, column_to_affected_materialized);
}
/// First, break a sequence of commands into stages.
stages.emplace_back(context);
@ -301,7 +300,7 @@ void MutationsInterpreter::prepare(bool dry_run)
for (const String & column : stage.output_columns)
all_asts->children.push_back(std::make_shared<ASTIdentifier>(column));
auto syntax_result = SyntaxAnalyzer(context, {}).analyze(all_asts, all_columns);
auto syntax_result = SyntaxAnalyzer(context).analyze(all_asts, all_columns);
stage.analyzer = std::make_unique<ExpressionAnalyzer>(all_asts, syntax_result, context);
ExpressionActionsChain & actions_chain = stage.expressions_chain;

View File

@ -0,0 +1,108 @@
#include <Common/typeid_cast.h>
#include <Parsers/ASTLiteral.h>
#include <Parsers/ASTFunction.h>
#include <Parsers/ASTExpressionList.h>
#include <Interpreters/OptimizeIfWithConstantConditionVisitor.h>
#include <IO/WriteHelpers.h>
namespace DB
{
namespace ErrorCodes
{
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
}
static bool tryExtractConstValueFromCondition(const ASTPtr & condition, bool & value)
{
/// numeric constant in condition
if (const ASTLiteral * literal = typeid_cast<ASTLiteral *>(condition.get()))
{
if (literal->value.getType() == Field::Types::Int64 ||
literal->value.getType() == Field::Types::UInt64)
{
value = literal->value.get<Int64>();
return true;
}
}
/// cast of numeric constant in condition to UInt8
if (const ASTFunction * function = typeid_cast<ASTFunction * >(condition.get()))
{
if (function->name == "CAST")
{
if (ASTExpressionList * expr_list = typeid_cast<ASTExpressionList *>(function->arguments.get()))
{
const ASTPtr & type_ast = expr_list->children.at(1);
if (const ASTLiteral * type_literal = typeid_cast<ASTLiteral *>(type_ast.get()))
{
if (type_literal->value.getType() == Field::Types::String &&
type_literal->value.get<std::string>() == "UInt8")
return tryExtractConstValueFromCondition(expr_list->children.at(0), value);
}
}
}
}
return false;
}
void OptimizeIfWithConstantConditionVisitor::visit(ASTPtr & current_ast)
{
if (!current_ast)
return;
for (ASTPtr & child : current_ast->children)
{
auto * function_node = typeid_cast<ASTFunction *>(child.get());
if (!function_node || function_node->name != "if")
{
visit(child);
continue;
}
visit(function_node->arguments);
auto * args = typeid_cast<ASTExpressionList *>(function_node->arguments.get());
if (args->children.size() != 3)
throw Exception("Wrong number of arguments for function 'if' (" + toString(args->children.size()) + " instead of 3)",
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
ASTPtr condition_expr = args->children[0];
ASTPtr then_expr = args->children[1];
ASTPtr else_expr = args->children[2];
bool condition;
if (tryExtractConstValueFromCondition(condition_expr, condition))
{
ASTPtr replace_ast = condition ? then_expr : else_expr;
ASTPtr child_copy = child;
String replace_alias = replace_ast->tryGetAlias();
String if_alias = child->tryGetAlias();
if (replace_alias.empty())
{
replace_ast->setAlias(if_alias);
child = replace_ast;
}
else
{
/// Only copy of one node is required here.
/// But IAST has only method for deep copy of subtree.
/// This can be a reason of performance degradation in case of deep queries.
ASTPtr replace_ast_deep_copy = replace_ast->clone();
replace_ast_deep_copy->setAlias(if_alias);
child = replace_ast_deep_copy;
}
if (!if_alias.empty())
{
auto alias_it = aliases.find(if_alias);
if (alias_it != aliases.end() && alias_it->second.get() == child_copy.get())
alias_it->second = child;
}
}
}
}
}

View File

@ -0,0 +1,27 @@
#pragma once
#include <unordered_map>
#include <Parsers/IAST.h>
namespace DB
{
/// It removes Function_if node from AST if condition is constant.
/// TODO: rewrite with InDepthNodeVisitor
class OptimizeIfWithConstantConditionVisitor
{
public:
using Aliases = std::unordered_map<String, ASTPtr>;
OptimizeIfWithConstantConditionVisitor(Aliases & aliases_)
: aliases(aliases_)
{}
void visit(ASTPtr & ast);
private:
Aliases & aliases;
};
}

View File

@ -14,6 +14,12 @@
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
extern const int UNKNOWN_ELEMENT_IN_AST;
}
static constexpr auto and_function_name = "and";
PredicateExpressionsOptimizer::PredicateExpressionsOptimizer(
@ -400,6 +406,8 @@ ASTs PredicateExpressionsOptimizer::evaluateAsterisk(ASTSelectQuery * select_que
DatabaseAndTableWithAlias database_and_table_name(*database_and_table_ast);
storage = context.getTable(database_and_table_name.database, database_and_table_name.table);
}
else
throw Exception("Logical error: unexpected table expression", ErrorCodes::LOGICAL_ERROR);
const auto block = storage->getSampleBlock();
for (size_t idx = 0; idx < block.columns(); idx++)

View File

@ -14,12 +14,6 @@
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
extern const int NUMBER_OF_COLUMNS_DOESNT_MATCH;
}
using PredicateExpressions = std::vector<ASTPtr>;
using ProjectionWithAlias = std::pair<ASTPtr, String>;
using ProjectionsWithAliases = std::vector<ProjectionWithAlias>;

View File

@ -132,7 +132,7 @@ typename SetVariantsTemplate<Variant>::Type SetVariantsTemplate<Variant>::choose
}
/// If there is one numeric key that fits into 64 bits
if (keys_size == 1 && nested_key_columns[0]->isNumeric())
if (keys_size == 1 && nested_key_columns[0]->isNumeric() && !nested_key_columns[0]->lowCardinality())
{
size_t size_of_field = nested_key_columns[0]->sizeOfValueIfFixed();
if (size_of_field == 1)

View File

@ -11,6 +11,7 @@
#include <Interpreters/ExecuteScalarSubqueriesVisitor.h>
#include <Interpreters/PredicateExpressionsOptimizer.h>
#include <Interpreters/ExternalDictionaries.h>
#include <Interpreters/OptimizeIfWithConstantConditionVisitor.h>
#include <Parsers/ASTSelectQuery.h>
#include <Parsers/ASTLiteral.h>
@ -34,7 +35,6 @@ namespace DB
namespace ErrorCodes
{
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
extern const int ALIAS_REQUIRED;
extern const int MULTIPLE_EXPRESSIONS_FOR_ALIAS;
extern const int EMPTY_NESTED_TABLE;
@ -42,141 +42,6 @@ namespace ErrorCodes
extern const int INVALID_JOIN_ON_EXPRESSION;
}
namespace
{
using LogAST = DebugASTLog<false>; /// set to true to enable logs
using Aliases = SyntaxAnalyzerResult::Aliases;
/// Add columns from storage to source_columns list.
void collectSourceColumns(ASTSelectQuery * select_query, const Context & context,
StoragePtr & storage, NamesAndTypesList & source_columns);
/// Translate qualified names such as db.table.column, table.column, table_alias.column to unqualified names.
void translateQualifiedNames(ASTPtr & query, ASTSelectQuery * select_query,
const NameSet & source_columns, const Context & context);
/// For star nodes(`*`), expand them to a list of all columns. For literal nodes, substitute aliases.
void normalizeTree(
ASTPtr & query,
SyntaxAnalyzerResult & result,
const Names & source_columns,
const NameSet & source_columns_set,
const StoragePtr & storage,
const Context & context,
const ASTSelectQuery * select_query,
bool asterisk_left_columns_only);
/// Sometimes we have to calculate more columns in SELECT clause than will be returned from query.
/// This is the case when we have DISTINCT or arrayJoin: we require more columns in SELECT even if we need less columns in result.
void removeUnneededColumnsFromSelectClause(const ASTSelectQuery * select_query, const Names & required_result_columns);
/// Replacing scalar subqueries with constant values.
void executeScalarSubqueries(ASTPtr & query, const ASTSelectQuery * select_query,
const Context & context, size_t subquery_depth);
/// Remove Function_if AST if condition is constant.
void optimizeIfWithConstantCondition(ASTPtr & current_ast, Aliases & aliases);
/// Eliminates injective function calls and constant expressions from group by statement.
void optimizeGroupBy(ASTSelectQuery * select_query, const NameSet & source_columns, const Context & context);
/// Remove duplicate items from ORDER BY.
void optimizeOrderBy(const ASTSelectQuery * select_query);
/// Remove duplicate items from LIMIT BY.
void optimizeLimitBy(const ASTSelectQuery * select_query);
/// Remove duplicated columns from USING(...).
void optimizeUsing(const ASTSelectQuery * select_query);
void getArrayJoinedColumns(ASTPtr & query, SyntaxAnalyzerResult & result, const ASTSelectQuery * select_query,
const Names & source_columns, const NameSet & source_columns_set);
/// Parse JOIN ON expression and collect ASTs for joined columns.
void collectJoinedColumnsFromJoinOnExpr(AnalyzedJoin & analyzed_join, const ASTSelectQuery * select_query,
const NameSet & source_columns, const Context & context);
/// Find the columns that are obtained by JOIN.
void collectJoinedColumns(AnalyzedJoin & analyzed_join, const ASTSelectQuery * select_query,
const NameSet & source_columns, const Context & context);
}
SyntaxAnalyzerResultPtr SyntaxAnalyzer::analyze(
ASTPtr & query,
const NamesAndTypesList & source_columns_,
const Names & required_result_columns,
size_t subquery_depth) const
{
SyntaxAnalyzerResult result;
result.storage = storage;
result.source_columns = source_columns_;
auto * select_query = typeid_cast<ASTSelectQuery *>(query.get());
collectSourceColumns(select_query, context, result.storage, result.source_columns);
const auto & settings = context.getSettingsRef();
Names source_columns_list;
source_columns_list.reserve(result.source_columns.size());
for (const auto & type_name : result.source_columns)
source_columns_list.emplace_back(type_name.name);
NameSet source_columns_set(source_columns_list.begin(), source_columns_list.end());
translateQualifiedNames(query, select_query, source_columns_set, context);
/// Depending on the user's profile, check for the execution rights
/// distributed subqueries inside the IN or JOIN sections and process these subqueries.
InJoinSubqueriesPreprocessor(context).process(select_query);
/// Optimizes logical expressions.
LogicalExpressionsOptimizer(select_query, settings.optimize_min_equality_disjunction_chain_length.value).perform();
/// Creates a dictionary `aliases`: alias -> ASTPtr
{
LogAST log;
QueryAliasesVisitor::Data query_aliases_data{result.aliases};
QueryAliasesVisitor(query_aliases_data, log.stream()).visit(query);
}
/// Common subexpression elimination. Rewrite rules.
normalizeTree(query, result, source_columns_list, source_columns_set, result.storage,
context, select_query, settings.asterisk_left_columns_only != 0);
/// Remove unneeded columns according to 'required_result_columns'.
/// Leave all selected columns in case of DISTINCT; columns that contain arrayJoin function inside.
/// Must be after 'normalizeTree' (after expanding aliases, for aliases not get lost)
/// and before 'executeScalarSubqueries', 'analyzeAggregation', etc. to avoid excessive calculations.
removeUnneededColumnsFromSelectClause(select_query, required_result_columns);
/// Executing scalar subqueries - replacing them with constant values.
executeScalarSubqueries(query, select_query, context, subquery_depth);
/// Optimize if with constant condition after constants was substituted instead of sclalar subqueries.
optimizeIfWithConstantCondition(query, result.aliases);
/// GROUP BY injective function elimination.
optimizeGroupBy(select_query, source_columns_set, context);
/// Remove duplicate items from ORDER BY.
optimizeOrderBy(select_query);
// Remove duplicated elements from LIMIT BY clause.
optimizeLimitBy(select_query);
/// Remove duplicated columns from USING(...).
optimizeUsing(select_query);
/// array_join_alias_to_name, array_join_result_to_source.
getArrayJoinedColumns(query, result, select_query, source_columns_list, source_columns_set);
/// Push the predicate expression down to the subqueries.
result.rewrite_subqueries = PredicateExpressionsOptimizer(select_query, settings, context).optimize();
collectJoinedColumns(result.analyzed_join, select_query, source_columns_set, context);
return std::make_shared<const SyntaxAnalyzerResult>(result);
}
void removeDuplicateColumns(NamesAndTypesList & columns)
{
std::set<String> names;
@ -192,15 +57,12 @@ void removeDuplicateColumns(NamesAndTypesList & columns)
namespace
{
void collectSourceColumns(ASTSelectQuery * select_query, const Context & context,
StoragePtr & storage, NamesAndTypesList & source_columns)
{
if (!storage && select_query)
{
if (auto db_and_table = getDatabaseAndTable(*select_query, 0))
storage = context.tryGetTable(db_and_table->database, db_and_table->table);
}
using LogAST = DebugASTLog<false>; /// set to true to enable logs
/// Add columns from storage to source_columns list.
void collectSourceColumns(ASTSelectQuery * select_query, StoragePtr storage, NamesAndTypesList & source_columns)
{
if (storage)
{
auto physical_columns = storage->getColumns().getAllPhysical();
@ -219,10 +81,11 @@ void collectSourceColumns(ASTSelectQuery * select_query, const Context & context
removeDuplicateColumns(source_columns);
}
/// Translate qualified names such as db.table.column, table.column, table_alias.column to unqualified names.
void translateQualifiedNames(ASTPtr & query, ASTSelectQuery * select_query,
const NameSet & source_columns, const Context & context)
{
if (!select_query || !select_query->tables || select_query->tables->children.empty())
if (!select_query->tables || select_query->tables->children.empty())
return;
std::vector<DatabaseAndTableWithAlias> tables = getDatabaseAndTables(*select_query, context.getCurrentDatabase());
@ -233,6 +96,7 @@ void translateQualifiedNames(ASTPtr & query, ASTSelectQuery * select_query,
visitor.visit(query);
}
/// For star nodes(`*`), expand them to a list of all columns. For literal nodes, substitute aliases.
void normalizeTree(
ASTPtr & query,
SyntaxAnalyzerResult & result,
@ -297,11 +161,10 @@ bool hasArrayJoin(const ASTPtr & ast)
return false;
}
/// Sometimes we have to calculate more columns in SELECT clause than will be returned from query.
/// This is the case when we have DISTINCT or arrayJoin: we require more columns in SELECT even if we need less columns in result.
void removeUnneededColumnsFromSelectClause(const ASTSelectQuery * select_query, const Names & required_result_columns)
{
if (!select_query)
return;
if (required_result_columns.empty())
return;
@ -335,121 +198,12 @@ void removeUnneededColumnsFromSelectClause(const ASTSelectQuery * select_query,
elements = std::move(new_elements);
}
void executeScalarSubqueries(ASTPtr & query, const ASTSelectQuery * select_query,
const Context & context, size_t subquery_depth)
/// Replacing scalar subqueries with constant values.
void executeScalarSubqueries(ASTPtr & query, const Context & context, size_t subquery_depth)
{
LogAST log;
if (!select_query)
{
ExecuteScalarSubqueriesVisitor::Data visitor_data{context, subquery_depth};
ExecuteScalarSubqueriesVisitor(visitor_data, log.stream()).visit(query);
}
else
{
for (auto & child : query->children)
{
/// Do not go to FROM, JOIN, UNION.
if (!typeid_cast<const ASTTableExpression *>(child.get())
&& !typeid_cast<const ASTSelectQuery *>(child.get()))
{
ExecuteScalarSubqueriesVisitor::Data visitor_data{context, subquery_depth};
ExecuteScalarSubqueriesVisitor(visitor_data, log.stream()).visit(child);
}
}
}
}
bool tryExtractConstValueFromCondition(const ASTPtr & condition, bool & value)
{
/// numeric constant in condition
if (const ASTLiteral * literal = typeid_cast<ASTLiteral *>(condition.get()))
{
if (literal->value.getType() == Field::Types::Int64 ||
literal->value.getType() == Field::Types::UInt64)
{
value = literal->value.get<Int64>();
return true;
}
}
/// cast of numeric constant in condition to UInt8
if (const ASTFunction * function = typeid_cast<ASTFunction * >(condition.get()))
{
if (function->name == "CAST")
{
if (ASTExpressionList * expr_list = typeid_cast<ASTExpressionList *>(function->arguments.get()))
{
const ASTPtr & type_ast = expr_list->children.at(1);
if (const ASTLiteral * type_literal = typeid_cast<ASTLiteral *>(type_ast.get()))
{
if (type_literal->value.getType() == Field::Types::String &&
type_literal->value.get<std::string>() == "UInt8")
return tryExtractConstValueFromCondition(expr_list->children.at(0), value);
}
}
}
}
return false;
}
void optimizeIfWithConstantCondition(ASTPtr & current_ast, Aliases & aliases)
{
if (!current_ast)
return;
for (ASTPtr & child : current_ast->children)
{
auto * function_node = typeid_cast<ASTFunction *>(child.get());
if (!function_node || function_node->name != "if")
{
optimizeIfWithConstantCondition(child, aliases);
continue;
}
optimizeIfWithConstantCondition(function_node->arguments, aliases);
auto * args = typeid_cast<ASTExpressionList *>(function_node->arguments.get());
if (args->children.size() != 3)
throw Exception("Wrong number of arguments for function 'if' (" + toString(args->children.size()) + " instead of 3)",
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
ASTPtr condition_expr = args->children[0];
ASTPtr then_expr = args->children[1];
ASTPtr else_expr = args->children[2];
bool condition;
if (tryExtractConstValueFromCondition(condition_expr, condition))
{
ASTPtr replace_ast = condition ? then_expr : else_expr;
ASTPtr child_copy = child;
String replace_alias = replace_ast->tryGetAlias();
String if_alias = child->tryGetAlias();
if (replace_alias.empty())
{
replace_ast->setAlias(if_alias);
child = replace_ast;
}
else
{
/// Only copy of one node is required here.
/// But IAST has only method for deep copy of subtree.
/// This can be a reason of performance degradation in case of deep queries.
ASTPtr replace_ast_deep_copy = replace_ast->clone();
replace_ast_deep_copy->setAlias(if_alias);
child = replace_ast_deep_copy;
}
if (!if_alias.empty())
{
auto alias_it = aliases.find(if_alias);
if (alias_it != aliases.end() && alias_it->second.get() == child_copy.get())
alias_it->second = child;
}
}
}
ExecuteScalarSubqueriesVisitor::Data visitor_data{context, subquery_depth};
ExecuteScalarSubqueriesVisitor(visitor_data, log.stream()).visit(query);
}
/** Calls to these functions in the GROUP BY statement would be
@ -491,9 +245,10 @@ const std::unordered_set<String> possibly_injective_function_names
"dictGetDateTime"
};
/// Eliminates injective function calls and constant expressions from group by statement.
void optimizeGroupBy(ASTSelectQuery * select_query, const NameSet & source_columns, const Context & context)
{
if (!(select_query && select_query->group_expression_list))
if (!select_query->group_expression_list)
return;
const auto is_literal = [] (const ASTPtr & ast)
@ -594,9 +349,10 @@ void optimizeGroupBy(ASTSelectQuery * select_query, const NameSet & source_colum
}
}
/// Remove duplicate items from ORDER BY.
void optimizeOrderBy(const ASTSelectQuery * select_query)
{
if (!(select_query && select_query->order_expression_list))
if (!select_query->order_expression_list)
return;
/// Make unique sorting conditions.
@ -620,9 +376,10 @@ void optimizeOrderBy(const ASTSelectQuery * select_query)
elems = unique_elems;
}
/// Remove duplicate items from LIMIT BY.
void optimizeLimitBy(const ASTSelectQuery * select_query)
{
if (!(select_query && select_query->limit_by_expression_list))
if (!select_query->limit_by_expression_list)
return;
std::set<String> elems_set;
@ -641,11 +398,9 @@ void optimizeLimitBy(const ASTSelectQuery * select_query)
elems = unique_elems;
}
/// Remove duplicated columns from USING(...).
void optimizeUsing(const ASTSelectQuery * select_query)
{
if (!select_query)
return;
auto node = const_cast<ASTTablesInSelectQueryElement *>(select_query->join());
if (!node)
return;
@ -676,9 +431,6 @@ void optimizeUsing(const ASTSelectQuery * select_query)
void getArrayJoinedColumns(ASTPtr & query, SyntaxAnalyzerResult & result, const ASTSelectQuery * select_query,
const Names & source_columns, const NameSet & source_columns_set)
{
if (!select_query)
return;
ASTPtr array_join_expression_list = select_query->array_join_expression_list();
if (array_join_expression_list)
{
@ -740,6 +492,7 @@ void getArrayJoinedColumns(ASTPtr & query, SyntaxAnalyzerResult & result, const
}
}
/// Parse JOIN ON expression and collect ASTs for joined columns.
void collectJoinedColumnsFromJoinOnExpr(AnalyzedJoin & analyzed_join, const ASTSelectQuery * select_query,
const NameSet & source_columns, const Context & context)
{
@ -899,12 +652,10 @@ void collectJoinedColumnsFromJoinOnExpr(AnalyzedJoin & analyzed_join, const ASTS
add_columns_from_equals_expr(table_join.on_expression);
}
/// Find the columns that are obtained by JOIN.
void collectJoinedColumns(AnalyzedJoin & analyzed_join, const ASTSelectQuery * select_query,
const NameSet & source_columns, const Context & context)
{
if (!select_query)
return;
const ASTTablesInSelectQueryElement * node = select_query->join();
if (!node)
@ -969,4 +720,94 @@ void collectJoinedColumns(AnalyzedJoin & analyzed_join, const ASTSelectQuery * s
}
SyntaxAnalyzerResultPtr SyntaxAnalyzer::analyze(
ASTPtr & query,
const NamesAndTypesList & source_columns_,
const Names & required_result_columns,
StoragePtr storage) const
{
auto * select_query = typeid_cast<ASTSelectQuery *>(query.get());
if (!storage && select_query)
{
if (auto db_and_table = getDatabaseAndTable(*select_query, 0))
storage = context.tryGetTable(db_and_table->database, db_and_table->table);
}
SyntaxAnalyzerResult result;
result.storage = storage;
result.source_columns = source_columns_;
collectSourceColumns(select_query, result.storage, result.source_columns);
const auto & settings = context.getSettingsRef();
Names source_columns_list;
source_columns_list.reserve(result.source_columns.size());
for (const auto & type_name : result.source_columns)
source_columns_list.emplace_back(type_name.name);
NameSet source_columns_set(source_columns_list.begin(), source_columns_list.end());
if (select_query)
{
translateQualifiedNames(query, select_query, source_columns_set, context);
/// Depending on the user's profile, check for the execution rights
/// distributed subqueries inside the IN or JOIN sections and process these subqueries.
InJoinSubqueriesPreprocessor(context).process(select_query);
/// Optimizes logical expressions.
LogicalExpressionsOptimizer(select_query, settings.optimize_min_equality_disjunction_chain_length.value).perform();
}
/// Creates a dictionary `aliases`: alias -> ASTPtr
{
LogAST log;
QueryAliasesVisitor::Data query_aliases_data{result.aliases};
QueryAliasesVisitor(query_aliases_data, log.stream()).visit(query);
}
/// Common subexpression elimination. Rewrite rules.
normalizeTree(query, result, source_columns_list, source_columns_set, result.storage,
context, select_query, settings.asterisk_left_columns_only != 0);
/// Remove unneeded columns according to 'required_result_columns'.
/// Leave all selected columns in case of DISTINCT; columns that contain arrayJoin function inside.
/// Must be after 'normalizeTree' (after expanding aliases, for aliases not get lost)
/// and before 'executeScalarSubqueries', 'analyzeAggregation', etc. to avoid excessive calculations.
if (select_query)
removeUnneededColumnsFromSelectClause(select_query, required_result_columns);
/// Executing scalar subqueries - replacing them with constant values.
executeScalarSubqueries(query, context, subquery_depth);
/// Optimize if with constant condition after constants was substituted instead of sclalar subqueries.
OptimizeIfWithConstantConditionVisitor(result.aliases).visit(query);
if (select_query)
{
/// GROUP BY injective function elimination.
optimizeGroupBy(select_query, source_columns_set, context);
/// Remove duplicate items from ORDER BY.
optimizeOrderBy(select_query);
/// Remove duplicated elements from LIMIT BY clause.
optimizeLimitBy(select_query);
/// Remove duplicated columns from USING(...).
optimizeUsing(select_query);
/// array_join_alias_to_name, array_join_result_to_source.
getArrayJoinedColumns(query, result, select_query, source_columns_list, source_columns_set);
/// Push the predicate expression down to the subqueries.
result.rewrite_subqueries = PredicateExpressionsOptimizer(select_query, settings, context).optimize();
collectJoinedColumns(result.analyzed_join, select_query, source_columns_set, context);
}
return std::make_shared<const SyntaxAnalyzerResult>(result);
}
}

View File

@ -54,16 +54,20 @@ using SyntaxAnalyzerResultPtr = std::shared_ptr<const SyntaxAnalyzerResult>;
class SyntaxAnalyzer
{
public:
SyntaxAnalyzer(const Context & context, StoragePtr storage) : context(context), storage(std::move(storage)) {}
SyntaxAnalyzer(const Context & context_, size_t subquery_depth_ = 0)
: context(context_)
, subquery_depth(subquery_depth_)
{}
SyntaxAnalyzerResultPtr analyze(
ASTPtr & query,
const NamesAndTypesList & source_columns_,
const Names & required_result_columns = {},
size_t subquery_depth = 0) const;
StoragePtr storage = {}) const;
private:
const Context & context;
StoragePtr storage;
size_t subquery_depth;
};
}

View File

@ -31,7 +31,7 @@ std::pair<Field, std::shared_ptr<const IDataType>> evaluateConstantExpression(co
{
NamesAndTypesList source_columns = {{ "_dummy", std::make_shared<DataTypeUInt8>() }};
auto ast = node->clone();
auto syntax_result = SyntaxAnalyzer(context, {}).analyze(ast, source_columns);
auto syntax_result = SyntaxAnalyzer(context).analyze(ast, source_columns);
ExpressionActionsPtr expr_for_constant_folding = ExpressionAnalyzer(ast, syntax_result, context).getConstActions();
/// There must be at least one column in the block so that it knows the number of rows.

View File

@ -48,7 +48,7 @@ void evaluateMissingDefaults(Block & block,
if (!save_unneeded_columns)
{
auto syntax_result = SyntaxAnalyzer(context, {}).analyze(default_expr_list, block.getNamesAndTypesList());
auto syntax_result = SyntaxAnalyzer(context).analyze(default_expr_list, block.getNamesAndTypesList());
ExpressionAnalyzer{default_expr_list, syntax_result, context}.getActions(true)->execute(block);
return;
}
@ -57,7 +57,7 @@ void evaluateMissingDefaults(Block & block,
* we are going to operate on a copy instead of the original block */
Block copy_block{block};
auto syntax_result = SyntaxAnalyzer(context, {}).analyze(default_expr_list, block.getNamesAndTypesList());
auto syntax_result = SyntaxAnalyzer(context).analyze(default_expr_list, block.getNamesAndTypesList());
ExpressionAnalyzer{default_expr_list, syntax_result, context}.getActions(true)->execute(copy_block);
/// move evaluated columns to the original block, materializing them at the same time

View File

@ -107,13 +107,13 @@ int main(int argc, char ** argv)
AggregateFunctionPtr func_avg = factory.get("avg", data_types_uint64);
AggregateFunctionPtr func_uniq = factory.get("uniq", data_types_uint64);
#define INIT \
{ \
value.resize(3); \
\
value[0] = func_count.get();\
value[1] = func_avg.get(); \
value[2] = func_uniq.get(); \
#define INIT \
{ \
value.resize(3); \
\
value[0] = func_count.get(); \
value[1] = func_avg.get(); \
value[2] = func_uniq.get(); \
}
INIT
@ -162,7 +162,8 @@ int main(int argc, char ** argv)
map.emplace(data[i], it, inserted);
if (inserted)
{
new(&it->second) Value(std::move(value));
new(&it->second) Value;
std::swap(it->second, value);
INIT
}
}
@ -192,7 +193,8 @@ int main(int argc, char ** argv)
map.emplace(data[i], it, inserted);
if (inserted)
{
new(&it->second) Value(std::move(value));
new(&it->second) Value;
std::swap(it->second, value);
INIT
}
}
@ -223,7 +225,8 @@ int main(int argc, char ** argv)
map.emplace(data[i], it, inserted);
if (inserted)
{
new(&it->second) Value(std::move(value));
new(&it->second) Value;
std::swap(it->second, value);
INIT
}
}
@ -248,7 +251,7 @@ int main(int argc, char ** argv)
std::unordered_map<Key, Value, DefaultHash<Key>>::iterator it;
for (size_t i = 0; i < n; ++i)
{
it = map.insert(std::make_pair(data[i], std::move(value))).first;
it = map.insert(std::make_pair(data[i], value)).first;
INIT
}
@ -269,7 +272,7 @@ int main(int argc, char ** argv)
map.set_empty_key(-1ULL);
for (size_t i = 0; i < n; ++i)
{
it = map.insert(std::make_pair(data[i], std::move(value))).first;
it = map.insert(std::make_pair(data[i], value)).first;
INIT
}
@ -289,7 +292,7 @@ int main(int argc, char ** argv)
GOOGLE_NAMESPACE::sparse_hash_map<Key, Value, DefaultHash<Key>>::iterator it;
for (size_t i = 0; i < n; ++i)
{
map.insert(std::make_pair(data[i], std::move(value)));
map.insert(std::make_pair(data[i], value));
INIT
}

View File

@ -398,7 +398,7 @@ void AlterCommands::validate(const IStorage & table, const Context & context)
{
const auto & default_expression = default_column.second.expression;
ASTPtr query = default_expression;
auto syntax_result = SyntaxAnalyzer(context, {}).analyze(query, all_columns);
auto syntax_result = SyntaxAnalyzer(context).analyze(query, all_columns);
const auto actions = ExpressionAnalyzer(query, syntax_result, context).getActions(true);
const auto required_columns = actions->getRequiredColumns();
@ -473,7 +473,7 @@ void AlterCommands::validate(const IStorage & table, const Context & context)
}
ASTPtr query = default_expr_list;
auto syntax_result = SyntaxAnalyzer(context, {}).analyze(query, all_columns);
auto syntax_result = SyntaxAnalyzer(context).analyze(query, all_columns);
const auto actions = ExpressionAnalyzer(query, syntax_result, context).getActions(true);
const auto block = actions->getSampleBlock();

View File

@ -304,7 +304,7 @@ BlockInputStreams StorageKafka::read(
if (num_created_consumers == 0)
return BlockInputStreams();
const size_t stream_count = std::min(num_streams, num_created_consumers);
const size_t stream_count = std::min(size_t(num_streams), num_created_consumers);
BlockInputStreams streams;
streams.reserve(stream_count);

View File

@ -116,6 +116,7 @@ MergeTreeData::MergeTreeData(
data_parts_by_state_and_info(data_parts_indexes.get<TagByStateAndInfo>())
{
setPrimaryKeyAndColumns(order_by_ast_, primary_key_ast_, columns_);
setSkipIndexes(indexes_ast_);
/// NOTE: using the same columns list as is read when performing actual merges.
merging_params.check(getColumns().getAllPhysical());
@ -128,7 +129,7 @@ MergeTreeData::MergeTreeData(
&& !attach && !settings.compatibility_allow_sampling_expression_not_in_primary_key) /// This is for backward compatibility.
throw Exception("Sampling expression must be present in the primary key", ErrorCodes::BAD_ARGUMENTS);
auto syntax = SyntaxAnalyzer(global_context, {}).analyze(sample_by_ast, getColumns().getAllPhysical());
auto syntax = SyntaxAnalyzer(global_context).analyze(sample_by_ast, getColumns().getAllPhysical());
columns_required_for_sampling = ExpressionAnalyzer(sample_by_ast, syntax, global_context)
.getRequiredSourceColumns();
}
@ -189,8 +190,6 @@ MergeTreeData::MergeTreeData(
throw Exception(
"MergeTree data format version on disk doesn't support custom partitioning",
ErrorCodes::METADATA_MISMATCH);
setSkipIndexes(indexes_ast_);
}
@ -286,7 +285,7 @@ void MergeTreeData::setPrimaryKeyAndColumns(
if (!added_key_column_expr_list->children.empty())
{
auto syntax = SyntaxAnalyzer(global_context, {}).analyze(added_key_column_expr_list, all_columns);
auto syntax = SyntaxAnalyzer(global_context).analyze(added_key_column_expr_list, all_columns);
Names used_columns = ExpressionAnalyzer(added_key_column_expr_list, syntax, global_context)
.getRequiredSourceColumns();
@ -309,7 +308,7 @@ void MergeTreeData::setPrimaryKeyAndColumns(
}
}
auto new_sorting_key_syntax = SyntaxAnalyzer(global_context, {}).analyze(new_sorting_key_expr_list, all_columns);
auto new_sorting_key_syntax = SyntaxAnalyzer(global_context).analyze(new_sorting_key_expr_list, all_columns);
auto new_sorting_key_expr = ExpressionAnalyzer(new_sorting_key_expr_list, new_sorting_key_syntax, global_context)
.getActions(false);
auto new_sorting_key_sample =
@ -318,7 +317,7 @@ void MergeTreeData::setPrimaryKeyAndColumns(
checkKeyExpression(*new_sorting_key_expr, new_sorting_key_sample, "Sorting");
auto new_primary_key_syntax = SyntaxAnalyzer(global_context, {}).analyze(new_primary_key_expr_list, all_columns);
auto new_primary_key_syntax = SyntaxAnalyzer(global_context).analyze(new_primary_key_expr_list, all_columns);
auto new_primary_key_expr = ExpressionAnalyzer(new_primary_key_expr_list, new_primary_key_syntax, global_context)
.getActions(false);
@ -356,27 +355,32 @@ void MergeTreeData::setSkipIndexes(const ASTPtr & indexes_asts, bool only_check)
{
return;
}
MergeTreeIndexes new_indexes;
std::set<String> names;
auto index_list = std::dynamic_pointer_cast<ASTExpressionList>(indexes_asts);
for (const auto &index_ast : index_list->children)
{
new_indexes.push_back(
std::move(MergeTreeIndexFactory::instance().get(
*this,
std::dynamic_pointer_cast<ASTIndexDeclaration>(index_ast),
global_context)));
if (names.find(new_indexes.back()->name) != names.end())
{
throw Exception(
"Index with name `" + new_indexes.back()->name + "` already exsists",
ErrorCodes::LOGICAL_ERROR);
}
names.insert(new_indexes.back()->name);
}
if (!only_check)
{
indexes.clear();
std::set<String> names;
auto index_list = std::dynamic_pointer_cast<ASTExpressionList>(indexes_asts);
for (const auto &index_ast : index_list->children)
{
indexes.push_back(
std::move(MergeTreeIndexFactory::instance().get(
*this,
std::dynamic_pointer_cast<ASTIndexDeclaration>(index_ast),
global_context)));
if (names.find(indexes.back()->name) != names.end())
{
throw Exception(
"Index with name `" + indexes.back()->name + "` already exsists",
ErrorCodes::LOGICAL_ERROR);
}
names.insert(indexes.back()->name);
}
skip_indexes_ast = indexes_asts;
indexes = std::move(new_indexes);
}
}
@ -411,7 +415,7 @@ void MergeTreeData::initPartitionKey()
return;
{
auto syntax_result = SyntaxAnalyzer(global_context, {}).analyze(partition_key_expr_list, getColumns().getAllPhysical());
auto syntax_result = SyntaxAnalyzer(global_context).analyze(partition_key_expr_list, getColumns().getAllPhysical());
partition_key_expr = ExpressionAnalyzer(partition_key_expr_list, syntax_result, global_context).getActions(false);
}
@ -1056,6 +1060,13 @@ void MergeTreeData::checkAlter(const AlterCommands & commands)
columns_alter_forbidden.insert(col);
}
for (auto index : indexes)
{
/// TODO: some special error telling about "drop index"
for (const String & col : index->expr->getRequiredColumns())
columns_alter_forbidden.insert(col);
}
if (sorting_key_expr)
{
for (const ExpressionAction & action : sorting_key_expr->getActions())
@ -1111,6 +1122,7 @@ void MergeTreeData::checkAlter(const AlterCommands & commands)
}
setPrimaryKeyAndColumns(new_order_by_ast, new_primary_key_ast, new_columns, /* only_check = */ true);
setSkipIndexes(skip_indexes_ast, /* only_check = */ true);
/// Check that type conversions are possible.
ExpressionActionsPtr unused_expression;
@ -2304,7 +2316,7 @@ MergeTreeData::DataPartsVector MergeTreeData::getDataPartsVector(const DataPartS
for (auto state : affordable_states)
{
buf = std::move(res);
std::swap(buf, res);
res.clear();
auto range = getDataPartsStateRange(state);

View File

@ -583,6 +583,7 @@ public:
/// Secondary (data skipping) indexes for MergeTree
MergeTreeIndexes indexes;
ASTPtr skip_indexes_ast;
/// Names of columns for primary key + secondary sorting columns.
Names sorting_key_columns;

View File

@ -492,7 +492,7 @@ BlockInputStreams MergeTreeDataSelectExecutor::readFromParts(
}
ASTPtr query = filter_function;
auto syntax_result = SyntaxAnalyzer(context, {}).analyze(query, available_real_columns);
auto syntax_result = SyntaxAnalyzer(context).analyze(query, available_real_columns);
filter_expression = ExpressionAnalyzer(filter_function, syntax_result, context).getActions(false);
/// Add columns needed for `sample_by_ast` to `column_names_to_read`.
@ -861,7 +861,7 @@ void MergeTreeDataSelectExecutor::createPositiveSignCondition(
arguments->children.push_back(one);
ASTPtr query = function;
auto syntax_result = SyntaxAnalyzer(context, {}).analyze(query, data.getColumns().getAllPhysical());
auto syntax_result = SyntaxAnalyzer(context).analyze(query, data.getColumns().getAllPhysical());
out_expression = ExpressionAnalyzer(query, syntax_result, context).getActions(false);
out_column = function->getColumnName();
}

View File

@ -86,7 +86,6 @@ void ReplicatedMergeTreeCleanupThread::clearOldLogs()
/// We will keep logs after and including this threshold.
UInt64 min_saved_log_pointer = std::numeric_limits<UInt64>::max();
UInt64 min_log_pointer_lost_candidate = std::numeric_limits<UInt64>::max();
Strings entries = zookeeper->getChildren(storage.zookeeper_path + "/log");
@ -118,7 +117,7 @@ void ReplicatedMergeTreeCleanupThread::clearOldLogs()
zookeeper->get(storage.zookeeper_path + "/replicas/" + replica + "/host", &host_stat);
String pointer = zookeeper->get(storage.zookeeper_path + "/replicas/" + replica + "/log_pointer");
UInt32 log_pointer = 0;
UInt64 log_pointer = 0;
if (!pointer.empty())
log_pointer = parse<UInt64>(pointer);
@ -190,7 +189,7 @@ void ReplicatedMergeTreeCleanupThread::clearOldLogs()
for (const String & replica : recovering_replicas)
{
String pointer = zookeeper->get(storage.zookeeper_path + "/replicas/" + replica + "/log_pointer");
UInt32 log_pointer = 0;
UInt64 log_pointer = 0;
if (!pointer.empty())
log_pointer = parse<UInt64>(pointer);
min_saved_log_pointer = std::min(min_saved_log_pointer, log_pointer);

View File

@ -648,7 +648,7 @@ ReplicatedMergeTreeQueue::StringSet ReplicatedMergeTreeQueue::moveSiblingPartsFo
/// Let's find the action to merge this part with others. Let's remember others.
StringSet parts_for_merge;
Queue::iterator merge_entry;
Queue::iterator merge_entry = queue.end();
for (Queue::iterator it = queue.begin(); it != queue.end(); ++it)
{
if ((*it)->type == LogEntry::MERGE_PARTS || (*it)->type == LogEntry::MUTATE_PART)

View File

@ -44,6 +44,8 @@ ReplicatedMergeTreeTableMetadata::ReplicatedMergeTreeTableMetadata(const MergeTr
if (data.format_version >= MERGE_TREE_DATA_MIN_FORMAT_VERSION_WITH_CUSTOM_PARTITIONING)
partition_key = formattedAST(MergeTreeData::extractKeyExpressionList(data.partition_by_ast));
skip_indexes = formattedAST(data.skip_indexes_ast);
}
void ReplicatedMergeTreeTableMetadata::write(WriteBuffer & out) const
@ -64,6 +66,9 @@ void ReplicatedMergeTreeTableMetadata::write(WriteBuffer & out) const
if (!sorting_key.empty())
out << "sorting key: " << sorting_key << "\n";
if (!skip_indexes.empty())
out << "skip indexes: " << skip_indexes << "\n";
}
String ReplicatedMergeTreeTableMetadata::toString() const
@ -93,6 +98,9 @@ void ReplicatedMergeTreeTableMetadata::read(ReadBuffer & in)
if (checkString("sorting key: ", in))
in >> sorting_key >> "\n";
if (checkString("skip indexes: ", in))
in >> skip_indexes >> "\n";
}
ReplicatedMergeTreeTableMetadata ReplicatedMergeTreeTableMetadata::parse(const String & s)
@ -175,6 +183,12 @@ ReplicatedMergeTreeTableMetadata::checkAndFindDiff(const ReplicatedMergeTreeTabl
ErrorCodes::METADATA_MISMATCH);
}
if (skip_indexes != from_zk.skip_indexes)
throw Exception("Existing table metadata in ZooKeeper differs in skip indexes."
" Stored in ZooKeeper: " + from_zk.skip_indexes +
", local: " + skip_indexes,
ErrorCodes::METADATA_MISMATCH);
return diff;
}

View File

@ -25,6 +25,7 @@ struct ReplicatedMergeTreeTableMetadata
MergeTreeDataFormatVersion data_format_version;
String partition_key;
String sorting_key;
String skip_indexes;
ReplicatedMergeTreeTableMetadata() = default;
explicit ReplicatedMergeTreeTableMetadata(const MergeTreeData & data);

View File

@ -170,7 +170,7 @@ StorageDistributed::~StorageDistributed() = default;
static ExpressionActionsPtr buildShardingKeyExpression(const ASTPtr & sharding_key, const Context & context, NamesAndTypesList columns, bool project)
{
ASTPtr query = sharding_key;
auto syntax_result = SyntaxAnalyzer(context, {}).analyze(query, columns);
auto syntax_result = SyntaxAnalyzer(context).analyze(query, columns);
return ExpressionAnalyzer(query, syntax_result, context).getActions(project);
}

View File

@ -26,6 +26,7 @@
#include <DataStreams/MaterializingBlockInputStream.h>
#include <DataStreams/FilterBlockInputStream.h>
#include <ext/range.h>
#include <algorithm>
#include <Parsers/ASTFunction.h>
#include <Parsers/queryToString.h>
@ -219,7 +220,7 @@ BlockInputStreams StorageMerge::read(
size_t current_need_streams = tables_count >= num_streams ? 1 : (num_streams / tables_count);
size_t current_streams = std::min(current_need_streams, remaining_streams);
remaining_streams -= current_streams;
current_streams = std::max(1, current_streams);
current_streams = std::max(size_t(1), current_streams);
StoragePtr storage = it->first;
TableStructureReadLockPtr struct_lock = it->second;
@ -452,7 +453,7 @@ void StorageMerge::convertingSourceStream(const Block & header, const Context &
NamesAndTypesList source_columns = getSampleBlock().getNamesAndTypesList();
NameAndTypePair virtual_column = getColumn("_table");
source_columns.insert(source_columns.end(), virtual_column);
auto syntax_result = SyntaxAnalyzer(context, {}).analyze(where_expression, source_columns);
auto syntax_result = SyntaxAnalyzer(context).analyze(where_expression, source_columns);
ExpressionActionsPtr actions = ExpressionAnalyzer{where_expression, syntax_result, context}.getActions(false, false);
Names required_columns = actions->getRequiredColumns();

View File

@ -244,6 +244,7 @@ void StorageMergeTree::alter(
/// Reinitialize primary key because primary key column types might have changed.
data.setPrimaryKeyAndColumns(new_order_by_ast, new_primary_key_ast, new_columns);
data.setSkipIndexes(data.skip_indexes_ast);
for (auto & transaction : transactions)
transaction->commit();

View File

@ -461,6 +461,7 @@ void StorageReplicatedMergeTree::setTableStructure(ColumnsDescription new_column
/// Even if the primary/sorting keys didn't change we must reinitialize it
/// because primary key column types might have changed.
data.setPrimaryKeyAndColumns(new_order_by_ast, new_primary_key_ast, new_columns);
data.setSkipIndexes(data.skip_indexes_ast);
}

View File

@ -157,7 +157,7 @@ void filterBlockWithQuery(const ASTPtr & query, Block & block, const Context & c
return;
/// Let's analyze and calculate the expression.
auto syntax_result = SyntaxAnalyzer(context, {}).analyze(expression_ast, block.getNamesAndTypesList());
auto syntax_result = SyntaxAnalyzer(context).analyze(expression_ast, block.getNamesAndTypesList());
ExpressionAnalyzer analyzer(expression_ast, syntax_result, context);
ExpressionActionsPtr actions = analyzer.getActions(false);

View File

@ -1,6 +1,7 @@
#include "getStructureOfRemoteTable.h"
#include <Interpreters/Cluster.h>
#include <Interpreters/Context.h>
#include <Interpreters/ClusterProxy/executeQuery.h>
#include <Interpreters/InterpreterDescribeQuery.h>
#include <DataStreams/RemoteBlockInputStream.h>
#include <DataTypes/DataTypeFactory.h>
@ -54,7 +55,10 @@ ColumnsDescription getStructureOfRemoteTable(
ColumnsDescription res;
auto input = std::make_shared<RemoteBlockInputStream>(shard_info.pool, query, InterpreterDescribeQuery::getSampleBlock(), context);
auto new_context = ClusterProxy::removeUserRestrictionsFromSettings(context, context.getSettingsRef());
/// Execute remote query without restrictions (because it's not real user query, but part of implementation)
auto input = std::make_shared<RemoteBlockInputStream>(shard_info.pool, query, InterpreterDescribeQuery::getSampleBlock(), new_context);
input->setPoolMode(PoolMode::GET_ONE);
if (!table_func_ptr)
input->setMainTable(QualifiedTableName{database, table});

View File

@ -4,9 +4,6 @@ target_link_libraries (system_numbers PRIVATE dbms clickhouse_storages_system cl
add_executable (storage_log storage_log.cpp)
target_link_libraries (storage_log PRIVATE dbms)
add_executable (seek_speed_test seek_speed_test.cpp)
target_link_libraries (seek_speed_test PRIVATE dbms)
add_executable (part_checker part_checker.cpp)
target_link_libraries (part_checker PRIVATE dbms)

View File

@ -1,68 +0,0 @@
#include <IO/ReadBufferFromFile.h>
#include <IO/WriteBufferFromFile.h>
#include <Common/Stopwatch.h>
#include <Poco/File.h>
#include <iostream>
#include <iomanip>
#include <vector>
#include <algorithm>
/** We test the hypothesis that skipping unnecessary parts of seek-forward never degrades overall read speed.
* Before the measurements, it is desirable to discard disk cache: `echo 3 > /proc/sys/vm/drop_caches`.
*
* Result: yes, even frequent relatively short seek forward does not worsen anything on all tested parameters
* - 1MiB of data, 16 0 0 16 vs 16 16 32 16
* - 1GiB of data, 1048576 0 0 vs 1048576 512 1024 vs 1048576 1048576 1048576
* - 1GiB of data, 1024 0 0 vs 1024 512 1024
*/
int main(int argc, const char ** argv)
{
if (argc < 5 || argc > 6)
{
std::cerr << "Usage:\n"
<< argv[0] << " file bytes_in_block min_skip_bytes max_skip_bytes [buffer_size]" << std::endl;
return 0;
}
int block = atoi(argv[2]);
int min_skip = atoi(argv[3]);
int max_skip = atoi(argv[4]);
size_t buf_size = argc <= 5 ? DBMS_DEFAULT_BUFFER_SIZE : static_cast<size_t>(atoi(argv[5]));
UInt64 size = Poco::File(argv[1]).getSize();
UInt64 pos = 0;
DB::ReadBufferFromFile in(argv[1], buf_size);
auto buf = std::make_unique<char[]>(block);
int checksum = 0;
UInt64 bytes_read = 0;
Stopwatch watch;
while (!in.eof())
{
UInt64 len = static_cast<UInt64>(rand() % (max_skip - min_skip + 1) + min_skip);
len = std::min(len, size - pos);
off_t seek_res = in.seek(len, SEEK_CUR);
pos += len;
if (seek_res != static_cast<off_t>(pos))
{
std::cerr << "Unexpected seek return value: " << seek_res << "; expeted " << pos << ", seeking by " << len << std::endl;
return 1;
}
len = std::min(static_cast<UInt64>(block), size - pos);
in.read(buf.get(), len);
checksum += buf[0] + buf[block - 1];
pos += len;
bytes_read += len;
}
watch.stop();
std::cout << checksum << std::endl; /// don't optimize
std::cout << "Read " << bytes_read << " out of " << size << " bytes in "
<< std::setprecision(4) << watch.elapsedSeconds() << " seconds ("
<< bytes_read / watch.elapsedSeconds() / 1000000 << " MB/sec.)" << std::endl;
return 0;
}

View File

@ -28,7 +28,7 @@ static void replaceConstFunction(IAST & node, const Context & context, const Nam
{
NamesAndTypesList source_columns = all_columns;
ASTPtr query = function->ptr();
auto syntax_result = SyntaxAnalyzer(context, {}).analyze(query, source_columns);
auto syntax_result = SyntaxAnalyzer(context).analyze(query, source_columns);
auto result_block = KeyCondition::getBlockWithConstants(query, syntax_result, context);
if (!result_block.has(child->getColumnName()))
return;
@ -92,7 +92,7 @@ String transformQueryForExternalDatabase(
const Context & context)
{
auto clone_query = query.clone();
auto syntax_result = SyntaxAnalyzer(context, {}).analyze(clone_query, available_columns);
auto syntax_result = SyntaxAnalyzer(context).analyze(clone_query, available_columns);
ExpressionAnalyzer analyzer(clone_query, syntax_result, context);
const Names & used_columns = analyzer.getRequiredSourceColumns();

View File

@ -34,7 +34,7 @@ StoragePtr TableFunctionNumbers::executeImpl(const ASTPtr & ast_function, const
res->startup();
return res;
}
throw new Exception("Table function 'numbers' requires 'limit' or 'offset, limit'.", ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
throw Exception("Table function 'numbers' requires 'limit' or 'offset, limit'.", ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
}
void registerTableFunctionNumbers(TableFunctionFactory & factory)

View File

@ -0,0 +1,38 @@
<yandex>
<profiles>
<default>
<max_memory_usage>10000000000</max_memory_usage>
<use_uncompressed_cache>0</use_uncompressed_cache>
<load_balancing>random</load_balancing>
</default>
<good>
<max_memory_usage>10000000000</max_memory_usage>
<use_uncompressed_cache>0</use_uncompressed_cache>
<load_balancing>random</load_balancing>
<max_concurrent_queries_for_user>2</max_concurrent_queries_for_user>
</good>
</profiles>
<users>
<default>
<password></password>
<networks incl="networks" replace="replace">
<ip>::/0</ip>
</networks>
<profile>default</profile>
<quota>default</quota>
</default>
<good>
<password></password>
<networks incl="networks" replace="replace">
<ip>::/0</ip>
</networks>
<profile>good</profile>
<quota>default</quota>
</good>
</users>
<quotas>
<default>
</default>
</quotas>
</yandex>

View File

@ -0,0 +1,38 @@
import time
import pytest
from multiprocessing.dummy import Pool
from helpers.cluster import ClickHouseCluster
cluster = ClickHouseCluster(__file__)
node1 = cluster.add_instance('node1', user_configs=['configs/user_restrictions.xml'])
node2 = cluster.add_instance('node2', user_configs=['configs/user_restrictions.xml'])
@pytest.fixture(scope="module")
def started_cluster():
try:
cluster.start()
node1.query("create table nums (number UInt64) ENGINE = MergeTree() order by tuple()")
node1.query("insert into nums values(0),(1)")
yield cluster
finally:
cluster.shutdown()
def test_exception_message(started_cluster):
assert node1.query("select number from nums order by number") == "0\n1\n"
def node_busy(_):
for i in xrange(10):
node1.query("select sleep(2)", user='default')
busy_pool = Pool(3)
busy_pool.map_async(node_busy, xrange(3))
time.sleep(1) # wait a little until polling starts
try:
assert node2.query("select number from remote('node1', 'default', 'nums')", user='good') == "0\n1\n"
except Exception as ex:
print ex.message
assert False, "Exception thrown while max_concurrent_queries_for_user is not exceeded"

View File

@ -0,0 +1,123 @@
0
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122

View File

@ -0,0 +1,7 @@
set allow_experimental_low_cardinality_type = 1;
drop table if exists test.lc;
create table test.lc (val LowCardinality(UInt64)) engine = MergeTree order by val;
insert into test.lc select number % 123 from system.numbers limit 100000;
select distinct(val) from test.lc order by val;
drop table if exists test.lc;

View File

@ -0,0 +1,7 @@
drop table if exists test.lc;
create table test.lc (names Array(LowCardinality(String))) engine=MergeTree order by tuple();
insert into test.lc values ([]);
insert into test.lc select emptyArrayString();
select * from test.lc;
drop table if exists test.lc;

View File

@ -1,4 +1,6 @@
FROM ubuntu:18.10
FROM ubuntu:18.04
RUN echo "deb [trusted=yes] http://apt.llvm.org/bionic/ llvm-toolchain-bionic-7 main" >> /etc/apt/sources.list
RUN apt-get update -y \
&& env DEBIAN_FRONTEND=noninteractive \

View File

@ -1,8 +1,10 @@
FROM ubuntu:18.10
FROM ubuntu:18.04
RUN apt-get update -y \
RUN echo "deb [trusted=yes] http://apt.llvm.org/bionic/ llvm-toolchain-bionic-7 main" >> /etc/apt/sources.list
RUN apt-get --allow-unauthenticated update -y \
&& env DEBIAN_FRONTEND=noninteractive \
apt-get install --yes --no-install-recommends \
apt-get --allow-unauthenticated install --yes --no-install-recommends \
bash \
fakeroot \
cmake \
@ -33,8 +35,8 @@ RUN apt-get update -y \
devscripts \
debhelper \
git \
libc++abi-dev \
libc++-dev \
libc++abi-dev \
libboost-program-options-dev \
libboost-system-dev \
libboost-filesystem-dev \

View File

@ -25,6 +25,8 @@ RUN ln -snf /usr/share/zoneinfo/$TZ /etc/localtime && echo $TZ > /etc/timezone
COPY zookeeper.xml /etc/clickhouse-server/config.d/zookeeper.xml
COPY listen.xml /etc/clickhouse-server/config.d/listen.xml
COPY part_log.xml /etc/clickhouse-server/config.d/part_log.xml
COPY log_queries.xml /etc/clickhouse-server/users.d/log_queries.xml
CMD dpkg -i package_folder/clickhouse-common-static_*.deb; \
dpkg -i package_folder/clickhouse-server_*.deb; \

View File

@ -0,0 +1,7 @@
<yandex>
<profiles>
<default>
<log_queries>1</log_queries>
</default>
</profiles>
</yandex>

View File

@ -0,0 +1,8 @@
<yandex>
<part_log>
<database>system</database>
<table>part_log</table>
<flush_interval_milliseconds>7500</flush_interval_milliseconds>
</part_log>
</yandex>

View File

@ -1,4 +1,4 @@
FROM ubuntu:18.04
FROM ubuntu:18.10
RUN apt-get update -y \
&& env DEBIAN_FRONTEND=noninteractive \
@ -20,6 +20,8 @@ RUN apt-get update -y \
telnet
COPY ./stress /stress
COPY log_queries.xml /etc/clickhouse-server/users.d/log_queries.xml
COPY part_log.xml /etc/clickhouse-server/config.d/part_log.xml
CMD dpkg -i package_folder/clickhouse-common-static_*.deb; \
dpkg -i package_folder/clickhouse-server_*.deb; \

View File

@ -0,0 +1,7 @@
<yandex>
<profiles>
<default>
<log_queries>1</log_queries>
</default>
</profiles>
</yandex>

View File

@ -0,0 +1,8 @@
<yandex>
<part_log>
<database>system</database>
<table>part_log</table>
<flush_interval_milliseconds>7500</flush_interval_milliseconds>
</part_log>
</yandex>

View File

@ -18,7 +18,7 @@ mkdocs==1.0.4
Pygments==2.2.0
python-slugify==1.2.6
pytz==2017.3
PyYAML==3.12
PyYAML==4.2b1
recommonmark==0.4.0
requests==2.21.0
singledispatch==3.4.0.3

View File

@ -1,6 +1,5 @@
#pragma once
#include <cstdint>
#include <cstddef>
using Int8 = int8_t;
using Int16 = int16_t;
@ -11,30 +10,3 @@ using UInt8 = uint8_t;
using UInt16 = uint16_t;
using UInt32 = uint32_t;
using UInt64 = uint64_t;
/** This is not the best way to overcome an issue of different definitions
* of uint64_t and size_t on Linux and Mac OS X (both 64 bit).
*
* Note that on both platforms, long and long long are 64 bit types.
* But they are always different types (with the same physical representation).
*/
namespace std
{
inline UInt64 max(unsigned long x, unsigned long long y) { return x > y ? x : y; }
inline UInt64 max(unsigned long long x, unsigned long y) { return x > y ? x : y; }
inline UInt64 min(unsigned long x, unsigned long long y) { return x < y ? x : y; }
inline UInt64 min(unsigned long long x, unsigned long y) { return x < y ? x : y; }
inline Int64 max(long x, long long y) { return x > y ? x : y; }
inline Int64 max(long long x, long y) { return x > y ? x : y; }
inline Int64 min(long x, long long y) { return x < y ? x : y; }
inline Int64 min(long long x, long y) { return x < y ? x : y; }
}
/// Workaround for the issue, that KDevelop doesn't see time_t and size_t types (for syntax highlight).
#ifdef IN_KDEVELOP_PARSER
using time_t = Int64;
using size_t = UInt64;
#endif

View File

@ -32,7 +32,7 @@ set -e
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
cd $CURDIR
source "./release_lib.sh"
source "./utils/release/release_lib.sh"
PBUILDER_AUTOUPDATE=${PBUILDER_AUTOUPDATE=4320}

View File

@ -106,7 +106,7 @@ static void mutate(pcg64 & generator, void * src, size_t length)
&& isAlphaASCII(pos[2]))
{
auto res = rand(generator, 0, 3);
if (res == 2)
if (res == 2)
{
std::swap(pos[0], pos[1]);
}
@ -118,7 +118,7 @@ static void mutate(pcg64 & generator, void * src, size_t length)
else if (pos + 5 <= end
&& pos[0] >= 0xC0 && pos[0] <= 0xDF && pos[1] >= 0x80 && pos[1] <= 0xBF
&& pos[2] >= 0x20 && pos[2] < 0x80 && !isAlphaASCII(pos[2])
&& pos[3] >= 0xC0 && pos[0] <= 0xDF && pos[4] >= 0x80 && pos[4] <= 0xBF)
&& pos[3] >= 0xC0 && pos[3] <= 0xDF && pos[4] >= 0x80 && pos[4] <= 0xBF)
{
auto res = rand(generator, 0, 3);
if (res == 2)

View File

@ -9,7 +9,7 @@ function gen_version_string {
}
function get_version {
BASEDIR=$(dirname "${BASH_SOURCE[0]}")
BASEDIR=$(dirname "${BASH_SOURCE[0]}")/../../
VERSION_REVISION=`grep "set(VERSION_REVISION" ${BASEDIR}/dbms/cmake/version.cmake | sed 's/^.*VERSION_REVISION \(.*\)$/\1/' | sed 's/[) ].*//'`
VERSION_MAJOR=`grep "set(VERSION_MAJOR" ${BASEDIR}/dbms/cmake/version.cmake | sed 's/^.*VERSION_MAJOR \(.*\)/\1/' | sed 's/[) ].*//'`
VERSION_MINOR=`grep "set(VERSION_MINOR" ${BASEDIR}/dbms/cmake/version.cmake | sed 's/^.*VERSION_MINOR \(.*\)/\1/' | sed 's/[) ].*//'`