Merge branch 'master' into remove-system-tables-lazy-load

This commit is contained in:
Alexey Milovidov 2020-05-31 18:06:08 +03:00
commit 87cf123ef8
80 changed files with 806 additions and 1227 deletions

View File

@ -4,6 +4,7 @@
namespace DB
{
void OwnFormattingChannel::logExtended(const ExtendedLogMessage & msg)
{
if (pChannel && priority >= msg.base.getPriority())
@ -28,5 +29,4 @@ void OwnFormattingChannel::log(const Poco::Message & msg)
OwnFormattingChannel::~OwnFormattingChannel() = default;
}

View File

@ -69,7 +69,6 @@ void OwnSplitChannel::logSplit(const Poco::Message & msg)
logs_queue->emplace(std::move(columns));
}
/// Also log to system.text_log table, if message is not too noisy
auto text_log_max_priority_loaded = text_log_max_priority.load(std::memory_order_relaxed);
if (text_log_max_priority_loaded && msg.getPriority() <= text_log_max_priority_loaded)

View File

@ -80,7 +80,9 @@ RUN apt-get --allow-unauthenticated update -y \
pigz \
moreutils \
libcctz-dev \
libldap2-dev
libldap2-dev \
libsasl2-dev \
heimdal-multidev

View File

@ -32,8 +32,8 @@ CMD dpkg -i package_folder/clickhouse-common-static_*.deb; \
ln -s /usr/share/clickhouse-test/config/decimals_dictionary.xml /etc/clickhouse-server/; \
ln -s /usr/share/clickhouse-test/config/macros.xml /etc/clickhouse-server/config.d/; \
ln -s /usr/lib/llvm-9/bin/llvm-symbolizer /usr/bin/llvm-symbolizer; \
if [ -n $USE_DATABASE_ATOMIC ] && [ $USE_DATABASE_ATOMIC -eq 1 ]; then ln -s /usr/share/clickhouse-test/config/database_atomic_configd.xml /etc/clickhouse-server/config.d/; fi; \
if [ -n $USE_DATABASE_ATOMIC ] && [ $USE_DATABASE_ATOMIC -eq 1 ]; then ln -s /usr/share/clickhouse-test/config/database_atomic_usersd.xml /etc/clickhouse-server/users.d/; fi; \
if [[ -n "$USE_DATABASE_ATOMIC" ]] && [[ "$USE_DATABASE_ATOMIC" -eq 1 ]]; then ln -s /usr/share/clickhouse-test/config/database_atomic_configd.xml /etc/clickhouse-server/config.d/; fi; \
if [[ -n "$USE_DATABASE_ATOMIC" ]] && [[ "$USE_DATABASE_ATOMIC" -eq 1 ]]; then ln -s /usr/share/clickhouse-test/config/database_atomic_usersd.xml /etc/clickhouse-server/users.d/; fi; \
echo "TSAN_OPTIONS='verbosity=1000 halt_on_error=1 history_size=7'" >> /etc/environment; \
echo "TSAN_SYMBOLIZER_PATH=/usr/lib/llvm-8/bin/llvm-symbolizer" >> /etc/environment; \
echo "UBSAN_OPTIONS='print_stacktrace=1'" >> /etc/environment; \

View File

@ -78,9 +78,9 @@ CMD dpkg -i package_folder/clickhouse-common-static_*.deb; \
ln -s /usr/share/clickhouse-test/config/server.key /etc/clickhouse-server/; \
ln -s /usr/share/clickhouse-test/config/server.crt /etc/clickhouse-server/; \
ln -s /usr/share/clickhouse-test/config/dhparam.pem /etc/clickhouse-server/; \
if [ -n $USE_POLYMORPHIC_PARTS ] && [ $USE_POLYMORPHIC_PARTS -eq 1 ]; then ln -s /usr/share/clickhouse-test/config/polymorphic_parts.xml /etc/clickhouse-server/config.d/; fi; \
if [ -n $USE_DATABASE_ATOMIC ] && [ $USE_DATABASE_ATOMIC -eq 1 ]; then ln -s /usr/share/clickhouse-test/config/database_atomic_configd.xml /etc/clickhouse-server/config.d/; fi; \
if [ -n $USE_DATABASE_ATOMIC ] && [ $USE_DATABASE_ATOMIC -eq 1 ]; then ln -s /usr/share/clickhouse-test/config/database_atomic_usersd.xml /etc/clickhouse-server/users.d/; fi; \
if [[ -n "$USE_POLYMORPHIC_PARTS" ]] && [[ "$USE_POLYMORPHIC_PARTS" -eq 1 ]]; then ln -s /usr/share/clickhouse-test/config/polymorphic_parts.xml /etc/clickhouse-server/config.d/; fi; \
if [[ -n "$USE_DATABASE_ATOMIC" ]] && [[ "$USE_DATABASE_ATOMIC" -eq 1 ]]; then ln -s /usr/share/clickhouse-test/config/database_atomic_configd.xml /etc/clickhouse-server/config.d/; fi; \
if [[ -n "$USE_DATABASE_ATOMIC" ]] && [[ "$USE_DATABASE_ATOMIC" -eq 1 ]]; then ln -s /usr/share/clickhouse-test/config/database_atomic_usersd.xml /etc/clickhouse-server/users.d/; fi; \
ln -sf /usr/share/clickhouse-test/config/client_config.xml /etc/clickhouse-client/config.xml; \
service zookeeper start; sleep 5; \
service clickhouse-server start && sleep 5 && clickhouse-test --testname --shard --zookeeper $ADDITIONAL_OPTIONS $SKIP_TESTS_OPTION 2>&1 | ts '%Y-%m-%d %H:%M:%S' | tee test_output/test_result.txt

View File

@ -1583,6 +1583,11 @@ private:
if (std::string::npos != embedded_stack_trace_pos && !config().getBool("stacktrace", false))
text.resize(embedded_stack_trace_pos);
/// If we probably have progress bar, we should add additional newline,
/// otherwise exception may display concatenated with the progress bar.
if (need_render_progress)
std::cerr << '\n';
std::cerr << "Received exception from server (version " << server_version << "):" << std::endl
<< "Code: " << e.code() << ". " << text << std::endl;
}

View File

@ -4,6 +4,8 @@
#include <Common/ZooKeeper/ZooKeeper.h>
#include <Common/ZooKeeper/KeeperException.h>
#include <Common/setThreadName.h>
namespace DB
{
@ -177,7 +179,11 @@ void ClusterCopier::discoverTablePartitions(const ConnectionTimeouts & timeouts,
ThreadPool thread_pool(num_threads ? num_threads : 2 * getNumberOfPhysicalCPUCores());
for (const TaskShardPtr & task_shard : task_table.all_shards)
thread_pool.scheduleOrThrowOnError([this, timeouts, task_shard]() { discoverShardPartitions(timeouts, task_shard); });
thread_pool.scheduleOrThrowOnError([this, timeouts, task_shard]()
{
setThreadName("DiscoverPartns");
discoverShardPartitions(timeouts, task_shard);
});
LOG_DEBUG(log, "Waiting for {} setup jobs", thread_pool.active());
thread_pool.wait();
@ -609,8 +615,7 @@ TaskStatus ClusterCopier::tryMoveAllPiecesToDestinationTable(const TaskTable & t
size_t num_nodes = executeQueryOnCluster(
task_table.cluster_push,
query_alter_ast_string,
nullptr,
&settings_push,
settings_push,
PoolMode::GET_MANY,
ClusterExecutionMode::ON_EACH_NODE);
@ -638,8 +643,7 @@ TaskStatus ClusterCopier::tryMoveAllPiecesToDestinationTable(const TaskTable & t
UInt64 num_nodes = executeQueryOnCluster(
task_table.cluster_push,
query_deduplicate_ast_string,
nullptr,
&task_cluster->settings_push,
task_cluster->settings_push,
PoolMode::GET_MANY);
LOG_INFO(log, "Number of shard that executed OPTIMIZE DEDUPLICATE query successfully : {}", toString(num_nodes));
@ -818,8 +822,7 @@ bool ClusterCopier::tryDropPartitionPiece(
/// We have to drop partition_piece on each replica
size_t num_shards = executeQueryOnCluster(
cluster_push, query,
nullptr,
&settings_push,
settings_push,
PoolMode::GET_MANY,
ClusterExecutionMode::ON_EACH_NODE);
@ -1293,7 +1296,7 @@ TaskStatus ClusterCopier::processPartitionPieceTaskImpl(
local_context.setSettings(task_cluster->settings_pull);
local_context.setSetting("skip_unavailable_shards", true);
Block block = getBlockWithAllStreamData(InterpreterFactory::get(query_select_ast, local_context)->execute().in);
Block block = getBlockWithAllStreamData(InterpreterFactory::get(query_select_ast, local_context)->execute().getInputStream());
count = (block) ? block.safeGetByPosition(0).column->getUInt(0) : 0;
}
@ -1356,9 +1359,7 @@ TaskStatus ClusterCopier::processPartitionPieceTaskImpl(
String query = queryToString(create_query_push_ast);
LOG_DEBUG(log, "Create destination tables. Query: {}", query);
UInt64 shards = executeQueryOnCluster(task_table.cluster_push, query,
create_query_push_ast, &task_cluster->settings_push,
PoolMode::GET_MANY);
UInt64 shards = executeQueryOnCluster(task_table.cluster_push, query, task_cluster->settings_push, PoolMode::GET_MANY);
LOG_DEBUG(log, "Destination tables {} have been created on {} shards of {}", getQuotedTable(task_table.table_push), shards, task_table.cluster_push->getShardCount());
}
@ -1403,7 +1404,7 @@ TaskStatus ClusterCopier::processPartitionPieceTaskImpl(
BlockIO io_select = InterpreterFactory::get(query_select_ast, context_select)->execute();
BlockIO io_insert = InterpreterFactory::get(query_insert_ast, context_insert)->execute();
input = io_select.in;
input = io_select.getInputStream();
output = io_insert.out;
}
@ -1479,9 +1480,7 @@ TaskStatus ClusterCopier::processPartitionPieceTaskImpl(
String query = queryToString(create_query_push_ast);
LOG_DEBUG(log, "Create destination tables. Query: {}", query);
UInt64 shards = executeQueryOnCluster(task_table.cluster_push, query,
create_query_push_ast, &task_cluster->settings_push,
PoolMode::GET_MANY);
UInt64 shards = executeQueryOnCluster(task_table.cluster_push, query, task_cluster->settings_push, PoolMode::GET_MANY);
LOG_DEBUG(log, "Destination tables {} have been created on {} shards of {}", getQuotedTable(task_table.table_push), shards, task_table.cluster_push->getShardCount());
}
catch (...)
@ -1548,8 +1547,7 @@ void ClusterCopier::dropHelpingTables(const TaskTable & task_table)
/// We have to drop partition_piece on each replica
UInt64 num_nodes = executeQueryOnCluster(
cluster_push, query,
nullptr,
&settings_push,
settings_push,
PoolMode::GET_MANY,
ClusterExecutionMode::ON_EACH_NODE);
@ -1575,8 +1573,7 @@ void ClusterCopier::dropParticularPartitionPieceFromAllHelpingTables(const TaskT
/// We have to drop partition_piece on each replica
UInt64 num_nodes = executeQueryOnCluster(
cluster_push, query,
nullptr,
&settings_push,
settings_push,
PoolMode::GET_MANY,
ClusterExecutionMode::ON_EACH_NODE);
@ -1690,7 +1687,7 @@ std::set<String> ClusterCopier::getShardPartitions(const ConnectionTimeouts & ti
Context local_context = context;
local_context.setSettings(task_cluster->settings_pull);
Block block = getBlockWithAllStreamData(InterpreterFactory::get(query_ast, local_context)->execute().in);
Block block = getBlockWithAllStreamData(InterpreterFactory::get(query_ast, local_context)->execute().getInputStream());
std::set<String> res;
if (block)
@ -1735,7 +1732,7 @@ const auto & settings = context.getSettingsRef();
Context local_context = context;
local_context.setSettings(task_cluster->settings_pull);
return InterpreterFactory::get(query_ast, local_context)->execute().in->read().rows() != 0;
return InterpreterFactory::get(query_ast, local_context)->execute().getInputStream()->read().rows() != 0;
}
bool ClusterCopier::checkPresentPartitionPiecesOnCurrentShard(const ConnectionTimeouts & timeouts,
@ -1774,7 +1771,7 @@ bool ClusterCopier::checkPresentPartitionPiecesOnCurrentShard(const ConnectionTi
Context local_context = context;
local_context.setSettings(task_cluster->settings_pull);
auto result = InterpreterFactory::get(query_ast, local_context)->execute().in->read().rows();
auto result = InterpreterFactory::get(query_ast, local_context)->execute().getInputStream()->read().rows();
if (result != 0)
LOG_DEBUG(log, "Partition {} piece number {} is PRESENT on shard {}", partition_quoted_name, std::to_string(current_piece_number), task_shard.getDescription());
else
@ -1788,25 +1785,16 @@ bool ClusterCopier::checkPresentPartitionPiecesOnCurrentShard(const ConnectionTi
UInt64 ClusterCopier::executeQueryOnCluster(
const ClusterPtr & cluster,
const String & query,
const ASTPtr & query_ast_,
const Settings * settings,
const Settings & current_settings,
PoolMode pool_mode,
ClusterExecutionMode execution_mode,
UInt64 max_successful_executions_per_shard) const
{
Settings current_settings = settings ? *settings : task_cluster->settings_common;
auto num_shards = cluster->getShardsInfo().size();
std::vector<UInt64> per_shard_num_successful_replicas(num_shards, 0);
ASTPtr query_ast;
if (query_ast_ == nullptr)
{
ParserQuery p_query(query.data() + query.size());
query_ast = parseQuery(p_query, query, current_settings.max_query_size, current_settings.max_parser_depth);
}
else
query_ast = query_ast_;
ParserQuery p_query(query.data() + query.size());
ASTPtr query_ast = parseQuery(p_query, query, current_settings.max_query_size, current_settings.max_parser_depth);
/// We will have to execute query on each replica of a shard.
if (execution_mode == ClusterExecutionMode::ON_EACH_NODE)
@ -1815,8 +1803,10 @@ UInt64 ClusterCopier::executeQueryOnCluster(
std::atomic<size_t> origin_replicas_number;
/// We need to execute query on one replica at least
auto do_for_shard = [&] (UInt64 shard_index)
auto do_for_shard = [&] (UInt64 shard_index, Settings shard_settings)
{
setThreadName("QueryForShard");
const Cluster::ShardInfo & shard = cluster->getShardsInfo().at(shard_index);
UInt64 & num_successful_executions = per_shard_num_successful_replicas.at(shard_index);
num_successful_executions = 0;
@ -1846,10 +1836,10 @@ UInt64 ClusterCopier::executeQueryOnCluster(
/// Will try to make as many as possible queries
if (shard.hasRemoteConnections())
{
current_settings.max_parallel_replicas = num_remote_replicas ? num_remote_replicas : 1;
shard_settings.max_parallel_replicas = num_remote_replicas ? num_remote_replicas : 1;
auto timeouts = ConnectionTimeouts::getTCPTimeoutsWithFailover(current_settings).getSaturated(current_settings.max_execution_time);
auto connections = shard.pool->getMany(timeouts, &current_settings, pool_mode);
auto timeouts = ConnectionTimeouts::getTCPTimeoutsWithFailover(shard_settings).getSaturated(shard_settings.max_execution_time);
auto connections = shard.pool->getMany(timeouts, &shard_settings, pool_mode);
for (auto & connection : connections)
{
@ -1859,7 +1849,7 @@ UInt64 ClusterCopier::executeQueryOnCluster(
try
{
/// CREATE TABLE and DROP PARTITION queries return empty block
RemoteBlockInputStream stream{*connection, query, Block{}, context, &current_settings};
RemoteBlockInputStream stream{*connection, query, Block{}, context, &shard_settings};
NullBlockOutputStream output{Block{}};
copyData(stream, output);
@ -1878,7 +1868,7 @@ UInt64 ClusterCopier::executeQueryOnCluster(
ThreadPool thread_pool(std::min<UInt64>(num_shards, getNumberOfPhysicalCPUCores()));
for (UInt64 shard_index = 0; shard_index < num_shards; ++shard_index)
thread_pool.scheduleOrThrowOnError([=] { do_for_shard(shard_index); });
thread_pool.scheduleOrThrowOnError([=, shard_settings = current_settings] { do_for_shard(shard_index, std::move(shard_settings)); });
thread_pool.wait();
}
@ -1898,7 +1888,7 @@ UInt64 ClusterCopier::executeQueryOnCluster(
LOG_INFO(log, "There was an error while executing ALTER on each node. Query was executed on {} nodes. But had to be executed on {}", toString(successful_nodes), toString(origin_replicas_number.load()));
}
return successful_nodes;
}
}

View File

@ -15,7 +15,6 @@ namespace DB
class ClusterCopier
{
public:
ClusterCopier(const String & task_path_,
const String & host_id_,
const String & proxy_database_name_,
@ -187,8 +186,7 @@ protected:
UInt64 executeQueryOnCluster(
const ClusterPtr & cluster,
const String & query,
const ASTPtr & query_ast_ = nullptr,
const Settings * settings = nullptr,
const Settings & current_settings,
PoolMode pool_mode = PoolMode::GET_ALL,
ClusterExecutionMode execution_mode = ClusterExecutionMode::ON_EACH_SHARD,
UInt64 max_successful_executions_per_shard = 0) const;

View File

@ -236,6 +236,14 @@ int Server::main(const std::vector<std::string> & /*args*/)
if (ThreadFuzzer::instance().isEffective())
LOG_WARNING(log, "ThreadFuzzer is enabled. Application will run slowly and unstable.");
#if !defined(NDEBUG) || !defined(__OPTIMIZE__)
LOG_WARNING(log, "Server was built in debug mode. It will work slowly.");
#endif
#if defined(ADDRESS_SANITIZER) || defined(THREAD_SANITIZER) || defined(MEMORY_SANITIZER)
LOG_WARNING(log, "Server was built with sanitizer. It will work slowly.");
#endif
/** Context contains all that query execution is dependent:
* settings, available functions, data types, aggregate functions, databases...
*/

View File

@ -5,8 +5,10 @@
#include <Poco/String.h>
#include <common/logger_useful.h>
#include <IO/WriteHelpers.h>
#include <IO/ReadHelpers.h>
#include <IO/Operators.h>
#include <IO/ReadBufferFromString.h>
#include <IO/ReadBufferFromFile.h>
#include <common/demangle.h>
#include <Common/formatReadable.h>
#include <Common/filesystemHelpers.h>
@ -25,6 +27,8 @@ namespace ErrorCodes
extern const int STD_EXCEPTION;
extern const int UNKNOWN_EXCEPTION;
extern const int LOGICAL_ERROR;
extern const int CANNOT_ALLOCATE_MEMORY;
extern const int CANNOT_MREMAP;
}
@ -156,6 +160,64 @@ static void getNoSpaceLeftInfoMessage(std::filesystem::path path, std::string &
#endif
}
/** It is possible that the system has enough memory,
* but we have shortage of the number of available memory mappings.
* Provide good diagnostic to user in that case.
*/
static void getNotEnoughMemoryMessage(std::string & msg)
{
#if defined(__linux__)
try
{
static constexpr size_t buf_size = 4096;
char buf[buf_size];
UInt64 max_map_count = 0;
{
ReadBufferFromFile file("/proc/sys/vm/max_map_count", buf_size, -1, buf);
readText(max_map_count, file);
}
UInt64 num_maps = 0;
{
ReadBufferFromFile file("/proc/self/maps", buf_size, -1, buf);
while (!file.eof())
{
char * next_pos = find_first_symbols<'\n'>(file.position(), file.buffer().end());
file.position() = next_pos;
if (!file.hasPendingData())
continue;
if (*file.position() == '\n')
{
++num_maps;
++file.position();
}
}
}
if (num_maps > max_map_count * 0.99)
{
msg += fmt::format(
"\nIt looks like that the process is near the limit on number of virtual memory mappings."
"\nCurrent number of mappings (/proc/self/maps): {}."
"\nLimit on number of mappings (/proc/sys/vm/max_map_count): {}."
"\nYou should increase the limit for vm.max_map_count in /etc/sysctl.conf"
"\n",
num_maps, max_map_count);
}
}
catch (...)
{
msg += "\nCannot obtain additional info about memory usage.";
}
#else
(void)msg;
#endif
}
static std::string getExtraExceptionInfo(const std::exception & e)
{
String msg;
@ -170,6 +232,13 @@ static std::string getExtraExceptionInfo(const std::exception & e)
{
if (errno_exception->getErrno() == ENOSPC && errno_exception->getPath())
getNoSpaceLeftInfoMessage(errno_exception->getPath().value(), msg);
else if (errno_exception->code() == ErrorCodes::CANNOT_ALLOCATE_MEMORY
|| errno_exception->code() == ErrorCodes::CANNOT_MREMAP)
getNotEnoughMemoryMessage(msg);
}
else if (dynamic_cast<const std::bad_alloc *>(&e))
{
getNotEnoughMemoryMessage(msg);
}
}
catch (...)

View File

@ -234,14 +234,6 @@ void ThreadPoolImpl<Thread>::worker(typename std::list<Thread>::iterator thread_
--scheduled_jobs;
}
DB::tryLogCurrentException("ThreadPool",
std::string("Exception in ThreadPool(") +
"max_threads: " + std::to_string(max_threads)
+ ", max_free_threads: " + std::to_string(max_free_threads)
+ ", queue_size: " + std::to_string(queue_size)
+ ", shutdown_on_exception: " + std::to_string(shutdown_on_exception)
+ ").");
job_finished.notify_all();
new_job_or_shutdown.notify_all();
return;

View File

@ -310,7 +310,7 @@ struct Settings : public SettingsCollection<Settings>
M(SettingUInt64, max_execution_speed, 0, "Maximum number of execution rows per second.", 0) \
M(SettingUInt64, min_execution_speed_bytes, 0, "Minimum number of execution bytes per second.", 0) \
M(SettingUInt64, max_execution_speed_bytes, 0, "Maximum number of execution bytes per second.", 0) \
M(SettingSeconds, timeout_before_checking_execution_speed, 0, "Check that the speed is not too low after the specified time has elapsed.", 0) \
M(SettingSeconds, timeout_before_checking_execution_speed, 10, "Check that the speed is not too low after the specified time has elapsed.", 0) \
\
M(SettingUInt64, max_columns_to_read, 0, "", 0) \
M(SettingUInt64, max_temporary_columns, 0, "", 0) \
@ -425,6 +425,8 @@ struct Settings : public SettingsCollection<Settings>
M(SettingSeconds, lock_acquire_timeout, DBMS_DEFAULT_LOCK_ACQUIRE_TIMEOUT_SEC, "How long locking request should wait before failing", 0) \
M(SettingBool, materialize_ttl_after_modify, true, "Apply TTL for old data, after ALTER MODIFY TTL query", 0) \
\
M(SettingBool, allow_experimental_geo_types, false, "Allow geo data types such as Point, Ring, Polygon, MultiPolygon", 0) \
\
/** Obsolete settings that do nothing but left for compatibility reasons. Remove each one after half a year of obsolescence. */ \
\
M(SettingBool, allow_experimental_low_cardinality_type, true, "Obsolete setting, does nothing. Will be removed after 2019-08-13", 0) \

View File

@ -13,6 +13,7 @@ limitations under the License. */
#include <DataStreams/IBlockInputStream.h>
#include <Processors/Sources/SourceWithProgress.h>
#include <Processors/Transforms/AggregatingTransform.h>
namespace DB
@ -38,7 +39,12 @@ protected:
Block res = *it;
++it;
return Chunk(res.getColumns(), res.rows());
auto info = std::make_shared<AggregatedChunkInfo>();
info->bucket_num = res.info.bucket_num;
info->is_overflows = res.info.is_overflows;
return Chunk(res.getColumns(), res.rows(), std::move(info));
}
private:

View File

@ -30,7 +30,8 @@ static void limitProgressingSpeed(size_t total_progress_size, size_t max_speed_i
{
UInt64 sleep_microseconds = desired_microseconds - total_elapsed_microseconds;
/// Never sleep more than one second (it should be enough to limit speed for a reasonable amount, and otherwise it's too easy to make query hang).
/// Never sleep more than one second (it should be enough to limit speed for a reasonable amount,
/// and otherwise it's too easy to make query hang).
sleep_microseconds = std::min(UInt64(1000000), sleep_microseconds);
sleepForMicroseconds(sleep_microseconds);
@ -45,14 +46,14 @@ void ExecutionSpeedLimits::throttle(
{
if ((min_execution_rps != 0 || max_execution_rps != 0
|| min_execution_bps != 0 || max_execution_bps != 0
|| (total_rows_to_read != 0 && timeout_before_checking_execution_speed != 0)) &&
(static_cast<Int64>(total_elapsed_microseconds) > timeout_before_checking_execution_speed.totalMicroseconds()))
|| (total_rows_to_read != 0 && timeout_before_checking_execution_speed != 0))
&& (static_cast<Int64>(total_elapsed_microseconds) > timeout_before_checking_execution_speed.totalMicroseconds()))
{
/// Do not count sleeps in throttlers
UInt64 throttler_sleep_microseconds = CurrentThread::getProfileEvents()[ProfileEvents::ThrottlerSleepMicroseconds];
double elapsed_seconds = 0;
if (throttler_sleep_microseconds > total_elapsed_microseconds)
if (total_elapsed_microseconds > throttler_sleep_microseconds)
elapsed_seconds = static_cast<double>(total_elapsed_microseconds - throttler_sleep_microseconds) / 1000000.0;
if (elapsed_seconds > 0)

View File

@ -274,7 +274,7 @@ void PushingToViewsBlockOutputStream::process(const Block & block, size_t view_n
StorageValues::create(
storage->getStorageID(), storage->getColumns(), block, storage->getVirtuals()));
select.emplace(view.query, local_context, SelectQueryOptions());
in = std::make_shared<MaterializingBlockInputStream>(select->execute().in);
in = std::make_shared<MaterializingBlockInputStream>(select->execute().getInputStream());
/// Squashing is needed here because the materialized view query can generate a lot of blocks
/// even when only one block is inserted into the parent table (e.g. if the query is a GROUP BY

View File

@ -0,0 +1,131 @@
#include <Columns/ColumnsNumber.h>
#include <Columns/ColumnTuple.h>
#include <DataTypes/DataTypeArray.h>
#include <DataTypes/DataTypeCustom.h>
#include <DataTypes/DataTypeCustomSimpleTextSerialization.h>
#include <DataTypes/DataTypeFactory.h>
#include <DataTypes/DataTypeTuple.h>
#include <DataTypes/DataTypesNumber.h>
namespace DB
{
namespace
{
class DataTypeCustomPointSerialization : public DataTypeCustomSimpleTextSerialization
{
public:
void serializeText(const IColumn & column, size_t row_num, WriteBuffer & ostr, const FormatSettings & settings) const override
{
nestedDataType()->serializeAsText(column, row_num, ostr, settings);
}
void deserializeText(IColumn & column, ReadBuffer & istr, const FormatSettings & settings) const override
{
nestedDataType()->deserializeAsWholeText(column, istr, settings);
}
static DataTypePtr nestedDataType()
{
static auto data_type = DataTypePtr(std::make_unique<DataTypeTuple>(
DataTypes({std::make_unique<DataTypeFloat64>(), std::make_unique<DataTypeFloat64>()})));
return data_type;
}
};
class DataTypeCustomRingSerialization : public DataTypeCustomSimpleTextSerialization
{
public:
void serializeText(const IColumn & column, size_t row_num, WriteBuffer & ostr, const FormatSettings & settings) const override
{
nestedDataType()->serializeAsText(column, row_num, ostr, settings);
}
void deserializeText(IColumn & column, ReadBuffer & istr, const FormatSettings & settings) const override
{
nestedDataType()->deserializeAsWholeText(column, istr, settings);
}
static DataTypePtr nestedDataType()
{
static auto data_type = DataTypePtr(std::make_unique<DataTypeArray>(DataTypeCustomPointSerialization::nestedDataType()));
return data_type;
}
};
class DataTypeCustomPolygonSerialization : public DataTypeCustomSimpleTextSerialization
{
public:
void serializeText(const IColumn & column, size_t row_num, WriteBuffer & ostr, const FormatSettings & settings) const override
{
nestedDataType()->serializeAsText(column, row_num, ostr, settings);
}
void deserializeText(IColumn & column, ReadBuffer & istr, const FormatSettings & settings) const override
{
nestedDataType()->deserializeAsWholeText(column, istr, settings);
}
static DataTypePtr nestedDataType()
{
static auto data_type = DataTypePtr(std::make_unique<DataTypeArray>(DataTypeCustomRingSerialization::nestedDataType()));
return data_type;
}
};
class DataTypeCustomMultiPolygonSerialization : public DataTypeCustomSimpleTextSerialization
{
public:
void serializeText(const IColumn & column, size_t row_num, WriteBuffer & ostr, const FormatSettings & settings) const override
{
nestedDataType()->serializeAsText(column, row_num, ostr, settings);
}
void deserializeText(IColumn & column, ReadBuffer & istr, const FormatSettings & settings) const override
{
nestedDataType()->deserializeAsWholeText(column, istr, settings);
}
static DataTypePtr nestedDataType()
{
static auto data_type = DataTypePtr(std::make_unique<DataTypeArray>(DataTypeCustomPolygonSerialization::nestedDataType()));
return data_type;
}
};
}
void registerDataTypeDomainGeo(DataTypeFactory & factory)
{
// Custom type for point represented as its coordinates stored as Tuple(Float64, Float64)
factory.registerSimpleDataTypeCustom("Point", []
{
return std::make_pair(DataTypeFactory::instance().get("Tuple(Float64, Float64)"),
std::make_unique<DataTypeCustomDesc>(std::make_unique<DataTypeCustomFixedName>("Point"), std::make_unique<DataTypeCustomPointSerialization>()));
});
// Custom type for simple polygon without holes stored as Array(Point)
factory.registerSimpleDataTypeCustom("Ring", []
{
return std::make_pair(DataTypeFactory::instance().get("Array(Point)"),
std::make_unique<DataTypeCustomDesc>(std::make_unique<DataTypeCustomFixedName>("Ring"), std::make_unique<DataTypeCustomRingSerialization>()));
});
// Custom type for polygon with holes stored as Array(Ring)
// First element of outer array is outer shape of polygon and all the following are holes
factory.registerSimpleDataTypeCustom("Polygon", []
{
return std::make_pair(DataTypeFactory::instance().get("Array(Ring)"),
std::make_unique<DataTypeCustomDesc>(std::make_unique<DataTypeCustomFixedName>("Polygon"), std::make_unique<DataTypeCustomPolygonSerialization>()));
});
// Custom type for multiple polygons with holes stored as Array(Polygon)
factory.registerSimpleDataTypeCustom("MultiPolygon", []
{
return std::make_pair(DataTypeFactory::instance().get("Array(Polygon)"),
std::make_unique<DataTypeCustomDesc>(std::make_unique<DataTypeCustomFixedName>("MultiPolygon"), std::make_unique<DataTypeCustomMultiPolygonSerialization>()));
});
}
}

View File

@ -180,6 +180,7 @@ DataTypeFactory::DataTypeFactory()
registerDataTypeLowCardinality(*this);
registerDataTypeDomainIPv4AndIPv6(*this);
registerDataTypeDomainSimpleAggregateFunction(*this);
registerDataTypeDomainGeo(*this);
}
DataTypeFactory & DataTypeFactory::instance()

View File

@ -83,5 +83,6 @@ void registerDataTypeLowCardinality(DataTypeFactory & factory);
void registerDataTypeDomainIPv4AndIPv6(DataTypeFactory & factory);
void registerDataTypeDomainSimpleAggregateFunction(DataTypeFactory & factory);
void registerDataTypeDateTime64(DataTypeFactory & factory);
void registerDataTypeDomainGeo(DataTypeFactory & factory);
}

View File

@ -1,3 +1,4 @@
# This file is generated automatically, do not edit. See 'ya.make.in' and use 'utils/generate-ya-make' to regenerate it.
LIBRARY()
PEERDIR(
@ -9,12 +10,13 @@ SRCS(
convertMySQLDataType.cpp
DataTypeAggregateFunction.cpp
DataTypeArray.cpp
DataTypeCustomGeo.cpp
DataTypeCustomIPv4AndIPv6.cpp
DataTypeCustomSimpleAggregateFunction.cpp
DataTypeCustomSimpleTextSerialization.cpp
DataTypeDate.cpp
DataTypeDateTime.cpp
DataTypeDateTime64.cpp
DataTypeDateTime.cpp
DataTypeDecimalBase.cpp
DataTypeEnum.cpp
DataTypeFactory.cpp
@ -36,6 +38,7 @@ SRCS(
getMostSubtype.cpp
IDataType.cpp
NestedUtils.cpp
)
END()

12
src/DataTypes/ya.make.in Normal file
View File

@ -0,0 +1,12 @@
LIBRARY()
PEERDIR(
clickhouse/src/Common
clickhouse/src/Formats
)
SRCS(
<? find . -name '*.cpp' | grep -v -F tests | sed 's/^\.\// /' | sort ?>
)
END()

View File

@ -1,3 +1,4 @@
# This file is generated automatically, do not edit. See 'ya.make.in' and use 'utils/generate-ya-make' to regenerate it.
LIBRARY()
PEERDIR(
@ -24,8 +25,8 @@ SRCS(
ComplexKeyCacheDictionary_generate3.cpp
ComplexKeyCacheDictionary_setAttributeValue.cpp
ComplexKeyCacheDictionary_setDefaultAttributeValue.cpp
ComplexKeyHashedDictionary.cpp
ComplexKeyDirectDictionary.cpp
ComplexKeyHashedDictionary.cpp
DictionaryBlockInputStreamBase.cpp
DictionaryFactory.cpp
DictionarySourceFactory.cpp

View File

@ -145,6 +145,8 @@ struct ConvertImpl
vec_to[i] = convertFromDecimal<FromDataType, ToDataType>(vec_from[i], vec_from.getScale());
else if constexpr (IsDataTypeNumber<FromDataType> && IsDataTypeDecimal<ToDataType>)
vec_to[i] = convertToDecimal<FromDataType, ToDataType>(vec_from[i], vec_to.getScale());
else
throw Exception("Unsupported data type in conversion function", ErrorCodes::CANNOT_CONVERT_TYPE);
}
else
vec_to[i] = static_cast<ToFieldType>(vec_from[i]);

View File

@ -7,11 +7,13 @@
#include <DataTypes/DataTypesNumber.h>
#include <Columns/ColumnString.h>
#include <Interpreters/Context.h>
#include <ext/scope_guard.h>
#include <thread>
#include <memory>
#include <cstdlib>
#include <unistd.h>
#include <sys/mman.h>
namespace DB
@ -22,6 +24,7 @@ namespace ErrorCodes
extern const int ILLEGAL_COLUMN;
extern const int ILLEGAL_TYPE_OF_ARGUMENT;
extern const int BAD_ARGUMENTS;
extern const int CANNOT_ALLOCATE_MEMORY;
}
@ -132,6 +135,25 @@ public:
{
(void)context.getCurrentQueryId();
}
else if (mode == "mmap many")
{
std::vector<void *> maps;
SCOPE_EXIT(
{
//for (void * map : maps)
// munmap(map, 4096);
});
while (true)
{
void * hint = reinterpret_cast<void *>(
std::uniform_int_distribution<intptr_t>(0x100000000000UL, 0x700000000000UL)(thread_local_rng));
void * map = mmap(hint, 4096, PROT_READ | PROT_WRITE, MAP_PRIVATE | MAP_ANONYMOUS, -1, 0);
if (MAP_FAILED == map)
throwFromErrno("Allocator: Cannot mmap", ErrorCodes::CANNOT_ALLOCATE_MEMORY);
maps.push_back(map);
}
}
else
throw Exception("Unknown trap mode", ErrorCodes::BAD_ARGUMENTS);
}

View File

@ -22,6 +22,12 @@ struct ExtractRaw
expects_end.pop_back();
current_expect_end = expects_end.empty() ? 0 : expects_end.back();
}
else if (current_expect_end == '"')
{
/// skip backslash
if (*pos == '\\' && pos + 1 < end && pos[1] == '"')
++pos;
}
else
{
switch (*pos)
@ -38,11 +44,6 @@ struct ExtractRaw
current_expect_end = '"';
expects_end.push_back(current_expect_end);
break;
case '\\':
/// skip backslash
if (pos + 1 < end && pos[1] == '"')
pos++;
break;
default:
if (!current_expect_end && (*pos == ',' || *pos == '}'))
{

View File

@ -1,3 +1,4 @@
# This file is generated automatically, do not edit. See 'ya.make.in' and use 'utils/generate-ya-make' to regenerate it.
LIBRARY()
CFLAGS(
@ -171,7 +172,6 @@ SRCS(
FunctionsRound.cpp
FunctionsStringArray.cpp
FunctionsStringSimilarity.cpp
FunctionUnixTimestamp64.h
GatherUtils/concat.cpp
GatherUtils/createArraySink.cpp
GatherUtils/createArraySource.cpp
@ -285,10 +285,10 @@ SRCS(
rand64.cpp
randConstant.cpp
rand.cpp
randomFixedString.cpp
randomPrintableASCII.cpp
randomString.cpp
randomStringUTF8.cpp
randomFixedString.cpp
regexpQuoteMeta.cpp
registerFunctionsArithmetic.cpp
registerFunctionsComparison.cpp
@ -308,8 +308,8 @@ SRCS(
registerFunctionsStringRegexp.cpp
registerFunctionsStringSearch.cpp
registerFunctionsTuple.cpp
registerFunctionsVisitParam.cpp
registerFunctionsUnixTimestamp64.cpp
registerFunctionsVisitParam.cpp
reinterpretAsFixedString.cpp
reinterpretAsString.cpp
reinterpretStringAs.cpp
@ -390,10 +390,10 @@ SRCS(
toTime.cpp
toTimeZone.cpp
toTypeName.cpp
toValidUTF8.cpp
toUnixTimestamp64Micro.cpp
toUnixTimestamp64Milli.cpp
toUnixTimestamp64Nano.cpp
toValidUTF8.cpp
toYear.cpp
toYYYYMM.cpp
toYYYYMMDD.cpp
@ -424,8 +424,8 @@ SRCS(
URL/fragment.cpp
URL/path.cpp
URL/pathFull.cpp
URL/protocol.cpp
URL/port.cpp
URL/protocol.cpp
URL/queryStringAndFragment.cpp
URL/queryString.cpp
URL/registerFunctionsURL.cpp

View File

@ -706,7 +706,7 @@ SetPtr ActionsMatcher::makeSet(const ASTFunction & node, Data & data, bool no_su
{
auto interpreter = interpretSubquery(right_in_operand, data.context, data.subquery_depth, {});
subquery_for_set.source = std::make_shared<LazyBlockInputStream>(
interpreter->getSampleBlock(), [interpreter]() mutable { return interpreter->execute().in; });
interpreter->getSampleBlock(), [interpreter]() mutable { return interpreter->execute().getInputStream(); });
/** Why is LazyBlockInputStream used?
*

View File

@ -15,6 +15,7 @@
#include <Processors/Transforms/ConvertingTransform.h>
#include <Processors/Sources/SourceFromInputStream.h>
#include <Processors/Executors/TreeExecutorBlockInputStream.h>
#include <Processors/Executors/PipelineExecutingBlockInputStream.h>
namespace ProfileEvents
{
@ -70,35 +71,14 @@ SelectStreamFactory::SelectStreamFactory(
namespace
{
Pipe createLocalStream(
const ASTPtr & query_ast, const Block & header, const Context & context, QueryProcessingStage::Enum processed_stage,
bool add_totals_port, bool add_extremes_port, bool force_tree_shaped_pipeline)
QueryPipeline createLocalStream(
const ASTPtr & query_ast, const Block & header, const Context & context, QueryProcessingStage::Enum processed_stage)
{
checkStackSize();
InterpreterSelectQuery interpreter{query_ast, context, SelectQueryOptions(processed_stage)};
if (force_tree_shaped_pipeline)
{
/// This flag means that pipeline must be tree-shaped,
/// so we can't enable processors for InterpreterSelectQuery here.
auto stream = interpreter.execute().in;
auto source = std::make_shared<SourceFromInputStream>(std::move(stream));
if (add_totals_port)
source->addTotalsPort();
if (add_extremes_port)
source->addExtremesPort();
Pipe pipe(std::move(source));
pipe.addSimpleTransform(std::make_shared<ConvertingTransform>(
pipe.getHeader(), header, ConvertingTransform::MatchColumnsMode::Name));
return pipe;
}
auto pipeline = interpreter.executeWithProcessors();
auto pipeline = interpreter.execute().pipeline;
pipeline.addSimpleTransform([&](const Block & source_header)
{
@ -116,7 +96,8 @@ Pipe createLocalStream(
*/
/// return std::make_shared<MaterializingBlockInputStream>(stream);
return std::move(pipeline).getPipe();
pipeline.setMaxThreads(1);
return pipeline;
}
String formattedAST(const ASTPtr & ast)
@ -134,7 +115,7 @@ void SelectStreamFactory::createForShard(
const Cluster::ShardInfo & shard_info,
const String &, const ASTPtr & query_ast,
const Context & context, const ThrottlerPtr & throttler,
const SelectQueryInfo & query_info,
const SelectQueryInfo &,
Pipes & res)
{
bool force_add_agg_info = processed_stage == QueryProcessingStage::WithMergeableState;
@ -152,8 +133,7 @@ void SelectStreamFactory::createForShard(
auto emplace_local_stream = [&]()
{
res.emplace_back(createLocalStream(modified_query_ast, header, context, processed_stage,
add_totals_port, add_extremes_port, query_info.force_tree_shaped_pipeline));
res.emplace_back(createLocalStream(modified_query_ast, header, context, processed_stage).getPipe());
};
String modified_query = formattedAST(modified_query_ast);
@ -266,7 +246,7 @@ void SelectStreamFactory::createForShard(
auto lazily_create_stream = [
pool = shard_info.pool, shard_num = shard_info.shard_num, modified_query, header = header, modified_query_ast, context, throttler,
main_table = main_table, table_func_ptr = table_func_ptr, scalars = scalars, external_tables = external_tables,
stage = processed_stage, local_delay, add_totals_port, add_extremes_port]()
stage = processed_stage, local_delay]()
-> BlockInputStreamPtr
{
auto current_settings = context.getSettingsRef();
@ -297,8 +277,8 @@ void SelectStreamFactory::createForShard(
}
if (try_results.empty() || local_delay < max_remote_delay)
return std::make_shared<TreeExecutorBlockInputStream>(
createLocalStream(modified_query_ast, header, context, stage, add_totals_port, add_extremes_port, true));
return std::make_shared<PipelineExecutingBlockInputStream>(
createLocalStream(modified_query_ast, header, context, stage));
else
{
std::vector<IConnectionPool::Entry> connections;

View File

@ -111,11 +111,11 @@ void ExecuteScalarSubqueriesMatcher::visit(const ASTSubquery & subquery, ASTPtr
}
else
{
BlockIO res = interpreter.execute();
auto stream = interpreter.execute().getInputStream();
try
{
block = res.in->read();
block = stream->read();
if (!block)
{
@ -126,7 +126,7 @@ void ExecuteScalarSubqueriesMatcher::visit(const ASTSubquery & subquery, ASTPtr
return;
}
if (block.rows() != 1 || res.in->read())
if (block.rows() != 1 || stream->read())
throw Exception("Scalar subquery returned more than one row", ErrorCodes::INCORRECT_RESULT_OF_SCALAR_SUBQUERY);
}
catch (const Exception & e)

View File

@ -297,13 +297,13 @@ void SelectQueryExpressionAnalyzer::tryMakeSetForIndexFromSubquery(const ASTPtr
}
auto interpreter_subquery = interpretSubquery(subquery_or_table_name, context, {}, query_options);
BlockIO res = interpreter_subquery->execute();
auto stream = interpreter_subquery->execute().getInputStream();
SetPtr set = std::make_shared<Set>(settings.size_limits_for_set, true, context.getSettingsRef().transform_null_in);
set->setHeader(res.in->getHeader());
set->setHeader(stream->getHeader());
res.in->readPrefix();
while (Block block = res.in->read())
stream->readPrefix();
while (Block block = stream->read())
{
/// If the limits have been exceeded, give up and let the default subquery processing actions take place.
if (!set->insertFromBlock(block))
@ -311,7 +311,7 @@ void SelectQueryExpressionAnalyzer::tryMakeSetForIndexFromSubquery(const ASTPtr
}
set->finishInsert();
res.in->readSuffix();
stream->readSuffix();
prepared_sets[set_key] = std::move(set);
}

View File

@ -134,7 +134,7 @@ public:
ast = database_and_table_name;
external_tables[external_table_name] = external_storage_holder;
subqueries_for_sets[external_table_name].source = interpreter->execute().in;
subqueries_for_sets[external_table_name].source = interpreter->execute().getInputStream();
subqueries_for_sets[external_table_name].table = external_storage;
/** NOTE If it was written IN tmp_table - the existing temporary (but not external) table,

View File

@ -2,14 +2,8 @@
#include <DataStreams/BlockIO.h>
#include <Processors/QueryPipeline.h>
namespace DB
{
namespace ErrorCodes
{
extern const int NOT_IMPLEMENTED;
}
/** Interpreters interface for different queries.
*/
@ -22,14 +16,10 @@ public:
*/
virtual BlockIO execute() = 0;
virtual QueryPipeline executeWithProcessors() { throw Exception("executeWithProcessors not implemented", ErrorCodes::NOT_IMPLEMENTED); }
virtual bool canExecuteWithProcessors() const { return false; }
virtual bool ignoreQuota() const { return false; }
virtual bool ignoreLimits() const { return false; }
virtual ~IInterpreter() {}
virtual ~IInterpreter() = default;
};
}

View File

@ -70,6 +70,7 @@ namespace ErrorCodes
extern const int BAD_DATABASE_FOR_TEMPORARY_TABLE;
extern const int SUSPICIOUS_TYPE_FOR_LOW_CARDINALITY;
extern const int DICTIONARY_ALREADY_EXISTS;
extern const int ILLEGAL_COLUMN;
}
@ -475,6 +476,21 @@ void InterpreterCreateQuery::validateTableStructure(const ASTCreateQuery & creat
}
}
}
if (!create.attach && !context.getSettingsRef().allow_experimental_geo_types)
{
for (const auto & name_and_type_pair : properties.columns.getAllPhysical())
{
const auto& type = name_and_type_pair.type->getName();
if (type == "MultiPolygon" || type == "Polygon" || type == "Ring" || type == "Point")
{
String message = "Cannot create table with column '" + name_and_type_pair.name + "' which type is '"
+ type + "' because experimental geo types are not allowed. "
+ "Set setting allow_experimental_geo_types = 1 in order to allow it.";
throw Exception(message, ErrorCodes::ILLEGAL_COLUMN);
}
}
}
}
void InterpreterCreateQuery::setEngine(ASTCreateQuery & create) const

View File

@ -204,7 +204,7 @@ BlockIO InterpreterInsertQuery::execute()
{
/// Passing 1 as subquery_depth will disable limiting size of intermediate result.
InterpreterSelectWithUnionQuery interpreter_select{ query.select, context, SelectQueryOptions(QueryProcessingStage::Complete, 1)};
res.pipeline = interpreter_select.executeWithProcessors();
res = interpreter_select.execute();
if (table->supportsParallelInsert() && settings.max_insert_threads > 1)
out_streams_size = std::min(size_t(settings.max_insert_threads), res.pipeline.getNumStreams());

File diff suppressed because it is too large Load Diff

View File

@ -77,12 +77,6 @@ public:
/// Execute a query. Get the stream of blocks to read.
BlockIO execute() override;
/// Execute the query and return multuple streams for parallel processing.
BlockInputStreams executeWithMultipleStreams(QueryPipeline & parent_pipeline);
QueryPipeline executeWithProcessors() override;
bool canExecuteWithProcessors() const override { return true; }
bool ignoreLimits() const override { return options.ignore_limits; }
bool ignoreQuota() const override { return options.ignore_quota; }
@ -108,89 +102,15 @@ private:
Block getSampleBlockImpl();
struct Pipeline
{
/** Streams of data.
* The source data streams are produced in the executeFetchColumns function.
* Then they are converted (wrapped in other streams) using the `execute*` functions,
* to get the whole pipeline running the query.
*/
BlockInputStreams streams;
/** When executing FULL or RIGHT JOIN, there will be a data stream from which you can read "not joined" rows.
* It has a special meaning, since reading from it should be done after reading from the main streams.
* It is appended to the main streams in UnionBlockInputStream or ParallelAggregatingBlockInputStream.
*/
BlockInputStreamPtr stream_with_non_joined_data;
bool union_stream = false;
/// Cache value of InterpreterSelectQuery::max_streams
size_t max_threads = 1;
BlockInputStreamPtr & firstStream() { return streams.at(0); }
template <typename Transform>
void transform(Transform && transformation)
{
for (auto & stream : streams)
transformation(stream);
if (stream_with_non_joined_data)
transformation(stream_with_non_joined_data);
}
bool hasMoreThanOneStream() const
{
return streams.size() + (stream_with_non_joined_data ? 1 : 0) > 1;
}
/// Resulting stream is mix of other streams data. Distinct and/or order guaranties are broken.
bool hasMixedStreams() const
{
return hasMoreThanOneStream() || union_stream;
}
bool hasDelayedStream() const { return stream_with_non_joined_data != nullptr; }
bool initialized() const { return !streams.empty(); }
/// Compatibility with QueryPipeline (Processors)
void setMaxThreads(size_t max_threads_) { max_threads = max_threads_; }
size_t getNumThreads() const { return max_threads; }
};
template <typename TPipeline>
void executeImpl(TPipeline & pipeline, const BlockInputStreamPtr & prepared_input, std::optional<Pipe> prepared_pipe, QueryPipeline & save_context_and_storage);
void executeImpl(QueryPipeline & pipeline, const BlockInputStreamPtr & prepared_input, std::optional<Pipe> prepared_pipe);
/// Different stages of query execution.
/// dry_run - don't read from table, use empty header block instead.
void executeWithMultipleStreamsImpl(Pipeline & pipeline, const BlockInputStreamPtr & input, bool dry_run);
template <typename TPipeline>
void executeFetchColumns(QueryProcessingStage::Enum processing_stage, TPipeline & pipeline,
void executeFetchColumns(
QueryProcessingStage::Enum processing_stage,
QueryPipeline & pipeline,
const PrewhereInfoPtr & prewhere_info,
const Names & columns_to_remove_after_prewhere,
QueryPipeline & save_context_and_storage);
void executeWhere(Pipeline & pipeline, const ExpressionActionsPtr & expression, bool remove_filter);
void executeAggregation(Pipeline & pipeline, const ExpressionActionsPtr & expression, bool overflow_row, bool final);
void executeMergeAggregated(Pipeline & pipeline, bool overflow_row, bool final);
void executeTotalsAndHaving(Pipeline & pipeline, bool has_having, const ExpressionActionsPtr & expression, bool overflow_row, bool final);
void executeHaving(Pipeline & pipeline, const ExpressionActionsPtr & expression);
static void executeExpression(Pipeline & pipeline, const ExpressionActionsPtr & expression);
void executeOrder(Pipeline & pipeline, InputSortingInfoPtr sorting_info);
void executeWithFill(Pipeline & pipeline);
void executeMergeSorted(Pipeline & pipeline);
void executePreLimit(Pipeline & pipeline);
void executeUnion(Pipeline & pipeline, Block header);
void executeLimitBy(Pipeline & pipeline);
void executeLimit(Pipeline & pipeline);
void executeOffset(Pipeline & pipeline);
static void executeProjection(Pipeline & pipeline, const ExpressionActionsPtr & expression);
void executeDistinct(Pipeline & pipeline, bool before_order, Names columns);
void executeExtremes(Pipeline & pipeline);
void executeSubqueriesInSetsAndJoins(Pipeline & pipeline, const std::unordered_map<String, SubqueryForSet> & subqueries_for_sets);
void executeMergeSorted(Pipeline & pipeline, const SortDescription & sort_description, UInt64 limit);
const Names & columns_to_remove_after_prewhere);
void executeWhere(QueryPipeline & pipeline, const ExpressionActionsPtr & expression, bool remove_filter);
void executeAggregation(QueryPipeline & pipeline, const ExpressionActionsPtr & expression, bool overflow_row, bool final);
@ -213,17 +133,12 @@ private:
String generateFilterActions(ExpressionActionsPtr & actions, const ASTPtr & row_policy_filter, const Names & prerequisite_columns = {}) const;
/// Add ConvertingBlockInputStream to specified header.
static void unifyStreams(Pipeline & pipeline, Block header);
enum class Modificator
{
ROLLUP = 0,
CUBE = 1
};
void executeRollupOrCube(Pipeline & pipeline, Modificator modificator);
void executeRollupOrCube(QueryPipeline & pipeline, Modificator modificator);
/** If there is a SETTINGS section in the SELECT query, then apply settings from it.

View File

@ -3,15 +3,9 @@
#include <Interpreters/Context.h>
#include <Parsers/ASTSelectWithUnionQuery.h>
#include <Parsers/ASTSelectQuery.h>
#include <DataStreams/UnionBlockInputStream.h>
#include <DataStreams/NullBlockInputStream.h>
#include <DataStreams/ConcatBlockInputStream.h>
#include <DataStreams/ConvertingBlockInputStream.h>
#include <Columns/getLeastSuperColumn.h>
#include <Columns/ColumnConst.h>
#include <Common/typeid_cast.h>
#include <Parsers/queryToString.h>
#include <Parsers/ASTExpressionList.h>
#include <Processors/Sources/NullSource.h>
#include <Processors/QueryPipeline.h>
@ -180,69 +174,10 @@ Block InterpreterSelectWithUnionQuery::getSampleBlock(
}
BlockInputStreams InterpreterSelectWithUnionQuery::executeWithMultipleStreams(QueryPipeline & parent_pipeline)
{
BlockInputStreams nested_streams;
for (auto & interpreter : nested_interpreters)
{
BlockInputStreams streams = interpreter->executeWithMultipleStreams(parent_pipeline);
nested_streams.insert(nested_streams.end(), streams.begin(), streams.end());
}
/// Unify data structure.
if (nested_interpreters.size() > 1)
{
for (auto & stream : nested_streams)
stream = std::make_shared<ConvertingBlockInputStream>(stream, result_header,ConvertingBlockInputStream::MatchColumnsMode::Position);
parent_pipeline.addInterpreterContext(context);
}
/// Update max_streams due to:
/// - max_distributed_connections for Distributed() engine
/// - max_streams_to_max_threads_ratio
///
/// XXX: res.pipeline.getMaxThreads() cannot be used since it is capped to
/// number of streams, which is empty for non-Processors case.
max_streams = (*std::min_element(nested_interpreters.begin(), nested_interpreters.end(), [](const auto &a, const auto &b)
{
return a->getMaxStreams() < b->getMaxStreams();
}))->getMaxStreams();
return nested_streams;
}
BlockIO InterpreterSelectWithUnionQuery::execute()
{
BlockIO res;
BlockInputStreams nested_streams = executeWithMultipleStreams(res.pipeline);
BlockInputStreamPtr result_stream;
if (nested_streams.empty())
{
result_stream = std::make_shared<NullBlockInputStream>(getSampleBlock());
}
else if (nested_streams.size() == 1)
{
result_stream = nested_streams.front();
nested_streams.clear();
}
else
{
result_stream = std::make_shared<UnionBlockInputStream>(nested_streams, nullptr, max_streams);
nested_streams.clear();
}
res.in = result_stream;
res.pipeline.addInterpreterContext(context);
return res;
}
QueryPipeline InterpreterSelectWithUnionQuery::executeWithProcessors()
{
QueryPipeline main_pipeline;
QueryPipeline & main_pipeline = res.pipeline;
std::vector<QueryPipeline> pipelines;
bool has_main_pipeline = false;
@ -254,12 +189,12 @@ QueryPipeline InterpreterSelectWithUnionQuery::executeWithProcessors()
if (!has_main_pipeline)
{
has_main_pipeline = true;
main_pipeline = interpreter->executeWithProcessors();
main_pipeline = interpreter->execute().pipeline;
headers.emplace_back(main_pipeline.getHeader());
}
else
{
pipelines.emplace_back(interpreter->executeWithProcessors());
pipelines.emplace_back(interpreter->execute().pipeline);
headers.emplace_back(pipelines.back().getHeader());
}
}
@ -280,7 +215,7 @@ QueryPipeline InterpreterSelectWithUnionQuery::executeWithProcessors()
main_pipeline.addInterpreterContext(context);
return main_pipeline;
return res;
}

View File

@ -29,12 +29,6 @@ public:
BlockIO execute() override;
/// Execute the query without union of streams.
BlockInputStreams executeWithMultipleStreams(QueryPipeline & parent_pipeline);
QueryPipeline executeWithProcessors() override;
bool canExecuteWithProcessors() const override { return true; }
bool ignoreLimits() const override { return options.ignore_limits; }
bool ignoreQuota() const override { return options.ignore_quota; }

View File

@ -184,7 +184,7 @@ bool isStorageTouchedByMutations(
/// For some reason it may copy context and and give it into ExpressionBlockInputStream
/// after that we will use context from destroyed stack frame in our stream.
InterpreterSelectQuery interpreter(select_query, context_copy, storage, SelectQueryOptions().ignoreLimits());
BlockInputStreamPtr in = interpreter.execute().in;
BlockInputStreamPtr in = interpreter.execute().getInputStream();
Block block = in->read();
if (!block.rows())
@ -687,7 +687,7 @@ void MutationsInterpreter::validate(TableStructureReadLockHolder &)
}
/// Do not use getSampleBlock in order to check the whole pipeline.
Block first_stage_header = select_interpreter->execute().in->getHeader();
Block first_stage_header = select_interpreter->execute().getInputStream()->getHeader();
BlockInputStreamPtr in = std::make_shared<NullBlockInputStream>(first_stage_header);
addStreamsForLaterStages(stages, in)->getHeader();
}
@ -697,7 +697,7 @@ BlockInputStreamPtr MutationsInterpreter::execute(TableStructureReadLockHolder &
if (!can_execute)
throw Exception("Cannot execute mutations interpreter because can_execute flag set to false", ErrorCodes::LOGICAL_ERROR);
BlockInputStreamPtr in = select_interpreter->execute().in;
BlockInputStreamPtr in = select_interpreter->execute().getInputStream();
auto result_stream = addStreamsForLaterStages(stages, in);

View File

@ -13,7 +13,7 @@ void SubqueryForSet::makeSource(std::shared_ptr<InterpreterSelectWithUnionQuery>
{
joined_block_aliases = std::move(joined_block_aliases_);
source = std::make_shared<LazyBlockInputStream>(interpreter->getSampleBlock(),
[interpreter]() mutable { return interpreter->execute().in; });
[interpreter]() mutable { return interpreter->execute().getInputStream(); });
sample_block = source->getHeader();
renameColumns(sample_block);

View File

@ -76,12 +76,6 @@ SystemLogs::SystemLogs(Context & global_context, const Poco::Util::AbstractConfi
text_log = createSystemLog<TextLog>(global_context, "system", "text_log", config, "text_log");
metric_log = createSystemLog<MetricLog>(global_context, "system", "metric_log", config, "metric_log");
if (metric_log)
{
size_t collect_interval_milliseconds = config.getUInt64("metric_log.collect_interval_milliseconds");
metric_log->startCollectMetric(collect_interval_milliseconds);
}
if (query_log)
logs.emplace_back(query_log.get());
if (query_thread_log)
@ -106,6 +100,12 @@ SystemLogs::SystemLogs(Context & global_context, const Poco::Util::AbstractConfi
shutdown();
throw;
}
if (metric_log)
{
size_t collect_interval_milliseconds = config.getUInt64("metric_log.collect_interval_milliseconds");
metric_log->startCollectMetric(collect_interval_milliseconds);
}
}

View File

@ -280,7 +280,6 @@ static std::tuple<ASTPtr, BlockIO> executeQueryImpl(
/// Copy query into string. It will be written to log and presented in processlist. If an INSERT query, string will not include data to insertion.
String query(begin, query_end);
BlockIO res;
QueryPipeline & pipeline = res.pipeline;
String query_for_logging;
@ -338,7 +337,6 @@ static std::tuple<ASTPtr, BlockIO> executeQueryImpl(
context.resetInputCallbacks();
auto interpreter = InterpreterFactory::get(ast, context, stage);
bool use_processors = interpreter->canExecuteWithProcessors();
std::shared_ptr<const EnabledQuota> quota;
if (!interpreter->ignoreQuota())
@ -358,10 +356,9 @@ static std::tuple<ASTPtr, BlockIO> executeQueryImpl(
limits.size_limits = SizeLimits(settings.max_result_rows, settings.max_result_bytes, settings.result_overflow_mode);
}
if (use_processors)
pipeline = interpreter->executeWithProcessors();
else
res = interpreter->execute();
res = interpreter->execute();
QueryPipeline & pipeline = res.pipeline;
bool use_processors = pipeline.initialized();
if (res.pipeline.initialized())
use_processors = true;

View File

@ -48,7 +48,6 @@ static void loadDatabase(
const String & database_path,
bool force_restore_data)
{
String database_attach_query;
String database_metadata_file = database_path + ".sql";

View File

@ -74,10 +74,6 @@ void PullingPipelineExecutor::cancel()
/// Cancel execution if it wasn't finished.
if (executor)
executor->cancel();
/// Read all data and finish execution.
Chunk chunk;
while (pull(chunk));
}
Chunk PullingPipelineExecutor::getTotals()

View File

@ -59,7 +59,11 @@ ConvertingTransform::ConvertingTransform(
break;
case MatchColumnsMode::Name:
if (source.has(res_elem.name))
/// It may seem strange, but sometimes block may have columns with the same name.
/// For this specific case, try to get column from the same position if it has correct name first.
if (result_col_num < source.columns() && source.getByPosition(result_col_num).name == res_elem.name)
conversion[result_col_num] = result_col_num;
else if (source.has(res_elem.name))
conversion[result_col_num] = source.getPositionByName(res_elem.name);
else
throw Exception("Cannot find column " + backQuoteIfNeed(res_elem.name) + " in source stream",

View File

@ -397,25 +397,6 @@ void IStorage::checkAlterIsPossible(const AlterCommands & commands, const Settin
}
}
BlockInputStreams IStorage::readStreams(
const Names & column_names,
const SelectQueryInfo & query_info,
const Context & context,
QueryProcessingStage::Enum processed_stage,
size_t max_block_size,
unsigned num_streams)
{
ForceTreeShapedPipeline enable_tree_shape(query_info);
auto pipes = read(column_names, query_info, context, processed_stage, max_block_size, num_streams);
BlockInputStreams res;
res.reserve(pipes.size());
for (auto & pipe : pipes)
res.emplace_back(std::make_shared<TreeExecutorBlockInputStream>(std::move(pipe)));
return res;
}
StorageID IStorage::getStorageID() const
{

View File

@ -306,16 +306,6 @@ public:
throw Exception("Method read is not supported by storage " + getName(), ErrorCodes::NOT_IMPLEMENTED);
}
/** The same as read, but returns BlockInputStreams.
*/
BlockInputStreams readStreams(
const Names & /*column_names*/,
const SelectQueryInfo & /*query_info*/,
const Context & /*context*/,
QueryProcessingStage::Enum /*processed_stage*/,
size_t /*max_block_size*/,
unsigned /*num_streams*/);
/** Writes the data to a table.
* Receives a description of the query, which can contain information about the data write method.
* Returns an object by which you can write data sequentially.

View File

@ -111,7 +111,7 @@ MergeableBlocksPtr StorageLiveView::collectMergeableBlocks(const Context & conte
InterpreterSelectQuery interpreter(mergeable_query->clone(), context, SelectQueryOptions(QueryProcessingStage::WithMergeableState), Names());
auto view_mergeable_stream = std::make_shared<MaterializingBlockInputStream>(interpreter.execute().in);
auto view_mergeable_stream = std::make_shared<MaterializingBlockInputStream>(interpreter.execute().getInputStream());
while (Block this_block = view_mergeable_stream->read())
base_blocks->push_back(this_block);
@ -148,7 +148,7 @@ BlockInputStreamPtr StorageLiveView::completeQuery(Pipes pipes)
block_context->addExternalTable(getBlocksTableName(), TemporaryTableHolder(global_context, creator));
InterpreterSelectQuery select(getInnerBlocksQuery(), *block_context, StoragePtr(), SelectQueryOptions(QueryProcessingStage::Complete));
BlockInputStreamPtr data = std::make_shared<MaterializingBlockInputStream>(select.execute().in);
BlockInputStreamPtr data = std::make_shared<MaterializingBlockInputStream>(select.execute().getInputStream());
/// Squashing is needed here because the view query can generate a lot of blocks
/// even when only one block is inserted into the parent table (e.g. if the query is a GROUP BY
@ -218,7 +218,7 @@ void StorageLiveView::writeIntoLiveView(
QueryProcessingStage::WithMergeableState);
auto data_mergeable_stream = std::make_shared<MaterializingBlockInputStream>(
select_block.execute().in);
select_block.execute().getInputStream());
while (Block this_block = data_mergeable_stream->read())
new_mergeable_blocks->push_back(this_block);

View File

@ -181,8 +181,11 @@ const KeyCondition::AtomMap KeyCondition::atom_map
},
{
"empty",
[] (RPNElement & out, const Field &)
[] (RPNElement & out, const Field & value)
{
if (value.getType() != Field::Types::String)
return false;
out.function = RPNElement::FUNCTION_IN_RANGE;
out.range = Range("");
return true;
@ -190,8 +193,11 @@ const KeyCondition::AtomMap KeyCondition::atom_map
},
{
"notEmpty",
[] (RPNElement & out, const Field &)
[] (RPNElement & out, const Field & value)
{
if (value.getType() != Field::Types::String)
return false;
out.function = RPNElement::FUNCTION_NOT_IN_RANGE;
out.range = Range("");
return true;

View File

@ -1151,7 +1151,7 @@ Pipes MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreamsFinal(
if (num_streams > settings.max_final_threads)
num_streams = settings.max_final_threads;
if (num_streams <= 1 || sort_description.empty() || query_info.force_tree_shaped_pipeline)
if (num_streams <= 1 || sort_description.empty())
{
Pipe pipe(std::move(pipes), get_merging_processor());

View File

@ -80,28 +80,6 @@ struct SelectQueryInfo
/// Prepared sets are used for indices by storage engine.
/// Example: x IN (1, 2, 3)
PreparedSets sets;
/// Temporary flag is needed to support old pipeline with input streams.
/// If enabled, then pipeline returned by storage must be a tree.
/// Processors from the tree can't return ExpandPipeline status.
mutable bool force_tree_shaped_pipeline = false;
};
/// RAII class to enable force_tree_shaped_pipeline for SelectQueryInfo.
/// Looks awful, but I hope it's temporary.
struct ForceTreeShapedPipeline
{
explicit ForceTreeShapedPipeline(const SelectQueryInfo & info_) : info(info_)
{
force_tree_shaped_pipeline = info.force_tree_shaped_pipeline;
info.force_tree_shaped_pipeline = true;
}
~ForceTreeShapedPipeline() { info.force_tree_shaped_pipeline = force_tree_shaped_pipeline; }
private:
bool force_tree_shaped_pipeline;
const SelectQueryInfo & info;
};
}

View File

@ -234,7 +234,7 @@ Pipes StorageBuffer::read(
*/
if (processed_stage > QueryProcessingStage::FetchColumns)
for (auto & pipe : pipes_from_buffers)
pipe = InterpreterSelectQuery(query_info.query, context, std::move(pipe), SelectQueryOptions(processed_stage)).executeWithProcessors().getPipe();
pipe = InterpreterSelectQuery(query_info.query, context, std::move(pipe), SelectQueryOptions(processed_stage)).execute().pipeline.getPipe();
if (query_info.prewhere_info)
{

View File

@ -238,20 +238,9 @@ Pipes StorageMerge::createSources(const SelectQueryInfo & query_info, const Quer
if (!storage)
{
if (query_info.force_tree_shaped_pipeline)
{
/// This flag means that pipeline must be tree-shaped,
/// so we can't enable processors for InterpreterSelectQuery here.
auto stream = InterpreterSelectQuery(modified_query_info.query, *modified_context, std::make_shared<OneBlockInputStream>(header),
SelectQueryOptions(processed_stage).analyze()).execute().in;
pipes.emplace_back(std::make_shared<SourceFromInputStream>(std::move(stream)));
return pipes;
}
auto pipe = InterpreterSelectQuery(modified_query_info.query, *modified_context,
std::make_shared<OneBlockInputStream>(header),
SelectQueryOptions(processed_stage).analyze()).executeWithProcessors().getPipe();
SelectQueryOptions(processed_stage).analyze()).execute().pipeline.getPipe();
pipe.addInterpreterContext(modified_context);
pipes.emplace_back(std::move(pipe));
return pipes;
@ -276,15 +265,8 @@ Pipes StorageMerge::createSources(const SelectQueryInfo & query_info, const Quer
InterpreterSelectQuery interpreter{modified_query_info.query, *modified_context, SelectQueryOptions(processed_stage)};
if (query_info.force_tree_shaped_pipeline)
{
BlockInputStreamPtr stream = interpreter.execute().in;
Pipe pipe(std::make_shared<SourceFromInputStream>(std::move(stream)));
pipes.emplace_back(std::move(pipe));
}
else
{
Pipe pipe = interpreter.executeWithProcessors().getPipe();
Pipe pipe = interpreter.execute().pipeline.getPipe();
pipes.emplace_back(std::move(pipe));
}

View File

@ -65,42 +65,24 @@ Pipes StorageView::read(
current_inner_query = getRuntimeViewQuery(*query_info.query->as<const ASTSelectQuery>(), context);
InterpreterSelectWithUnionQuery interpreter(current_inner_query, context, {}, column_names);
/// FIXME res may implicitly use some objects owned be pipeline, but them will be destructed after return
if (query_info.force_tree_shaped_pipeline)
auto pipeline = interpreter.execute().pipeline;
/// It's expected that the columns read from storage are not constant.
/// Because method 'getSampleBlockForColumns' is used to obtain a structure of result in InterpreterSelectQuery.
pipeline.addSimpleTransform([](const Block & header)
{
QueryPipeline pipeline;
BlockInputStreams streams = interpreter.executeWithMultipleStreams(pipeline);
return std::make_shared<MaterializingTransform>(header);
});
for (auto & stream : streams)
{
stream = std::make_shared<MaterializingBlockInputStream>(stream);
stream = std::make_shared<ConvertingBlockInputStream>(stream, getSampleBlockForColumns(column_names),
ConvertingBlockInputStream::MatchColumnsMode::Name);
}
for (auto & stream : streams)
pipes.emplace_back(std::make_shared<SourceFromInputStream>(std::move(stream)));
}
else
/// And also convert to expected structure.
pipeline.addSimpleTransform([&](const Block & header)
{
auto pipeline = interpreter.executeWithProcessors();
return std::make_shared<ConvertingTransform>(header, getSampleBlockForColumns(column_names),
ConvertingTransform::MatchColumnsMode::Name);
});
/// It's expected that the columns read from storage are not constant.
/// Because method 'getSampleBlockForColumns' is used to obtain a structure of result in InterpreterSelectQuery.
pipeline.addSimpleTransform([](const Block & header)
{
return std::make_shared<MaterializingTransform>(header);
});
/// And also convert to expected structure.
pipeline.addSimpleTransform([&](const Block & header)
{
return std::make_shared<ConvertingTransform>(header, getSampleBlockForColumns(column_names),
ConvertingTransform::MatchColumnsMode::Name);
});
pipes = std::move(pipeline).getPipes();
}
pipes = std::move(pipeline).getPipes();
return pipes;
}

View File

@ -144,7 +144,7 @@ def test_mysql_federated(mysql_server, server_address):
node.query('''CREATE TABLE mysql_federated.test (col UInt32) ENGINE = Log''', settings={"password": "123"})
node.query('''INSERT INTO mysql_federated.test VALUES (0), (1), (5)''', settings={"password": "123"})
code, (_, stderr) = mysql_server.exec_run('''
code, (stdout, stderr) = mysql_server.exec_run('''
mysql
-e "DROP SERVER IF EXISTS clickhouse;"
-e "CREATE SERVER clickhouse FOREIGN DATA WRAPPER mysql OPTIONS (USER 'default', PASSWORD '123', HOST '{host}', PORT {port}, DATABASE 'mysql_federated');"
@ -152,6 +152,9 @@ def test_mysql_federated(mysql_server, server_address):
-e "CREATE DATABASE mysql_federated;"
'''.format(host=server_address, port=server_port), demux=True)
if code != 0:
print(stdout)
print(stderr)
assert code == 0
code, (stdout, stderr) = mysql_server.exec_run('''

View File

@ -864,6 +864,8 @@ def test_double_move_while_select(started_cluster, name, positive):
thread = threading.Thread(target=long_select)
thread.start()
time.sleep(1)
node1.query("ALTER TABLE {name} MOVE PART '{part}' TO DISK 'jbod1'".format(name=name, part=parts[0]))
# Fill jbod1 to force ClickHouse to make move of partition 1 to external.

View File

@ -9,5 +9,7 @@ test"string
"test_string"
"test\\"string"
"test\\"string"
"{"
"["
["]", "2", "3"]
{"nested" : [1,2,3]}

View File

@ -11,5 +11,7 @@ SELECT visitParamExtractRaw('{"myparam":"test_string"}', 'myparam');
SELECT visitParamExtractRaw('{"myparam": "test_string"}', 'myparam');
SELECT visitParamExtractRaw('{"myparam": "test\\"string"}', 'myparam');
SELECT visitParamExtractRaw('{"myparam": "test\\"string", "other":123}', 'myparam');
SELECT visitParamExtractRaw('{"myparam": "{"}', 'myparam');
SELECT visitParamExtractRaw('{"myparam": "["}', 'myparam');
SELECT visitParamExtractRaw('{"myparam": ["]", "2", "3"], "other":123}', 'myparam');
SELECT visitParamExtractRaw('{"myparam": {"nested" : [1,2,3]}, "other":123}', 'myparam');

View File

@ -1,4 +1,4 @@
0
0
100000
200000
800000
1600000

View File

@ -54,7 +54,7 @@ EOL
echo "create table null_01278 as data_01278 Engine=Null();" | execute
for i in $(seq 1 $TEST_01278_PARTS); do
echo "create table part_01278_$i as data_01278 Engine=Buffer(currentDatabase(), null_01278, 1, 86400, 86400, 1e5, 1e6, 10e6, 100e6);"
echo "create materialized view mv_01278_$i to part_01278_$i as select * from data_01278 where key%$TEST_01278_PARTS+1 == $i;"
echo "create materialized view mv_01278_$i to part_01278_$i as select * from data_01278 where key%$TEST_01278_PARTS+1 != $i;"
done | execute
echo "create table out_01278 as data_01278 Engine=Merge(currentDatabase(), 'part_01278_');" | execute

View File

@ -0,0 +1,8 @@
Ok (1)
Ok (2)
2000000
1
Ok (3)
2000000
1
Ok (4)

View File

@ -0,0 +1,44 @@
SET min_execution_speed = 100000000000, timeout_before_checking_execution_speed = 0.1;
SELECT count() FROM system.numbers; -- { serverError 160 }
SELECT 'Ok (1)';
SET min_execution_speed = 0;
SET min_execution_speed_bytes = 800000000000, timeout_before_checking_execution_speed = 0.1;
SELECT count() FROM system.numbers; -- { serverError 160 }
SELECT 'Ok (2)';
SET min_execution_speed_bytes = 0;
SET max_execution_speed = 1000000;
SET max_block_size = 100;
CREATE TEMPORARY TABLE times (t DateTime);
INSERT INTO times SELECT now();
SELECT count() FROM numbers(2000000);
INSERT INTO times SELECT now();
SELECT max(t) - min(t) >= 1 FROM times;
SELECT 'Ok (3)';
SET max_execution_speed = 0;
SET max_execution_speed_bytes = 8000000;
TRUNCATE TABLE times;
INSERT INTO times SELECT now();
SELECT count() FROM numbers(2000000);
INSERT INTO times SELECT now();
SELECT max(t) - min(t) >= 1 FROM times;
SELECT 'Ok (4)';
SET max_execution_speed_bytes = 0;
-- Note that 'min_execution_speed' does not count sleeping due to throttling
-- with 'max_execution_speed' and similar limits like 'priority' and 'max_network_bandwidth'
-- Note: I have to disable this part of the test because it actually can work slower under sanitizers,
-- with debug builds and in presense of random system hickups in our CI environment.
--SET max_execution_speed = 1000000, min_execution_speed = 2000000;
-- And this query will work despite the fact that the above settings look contradictory.
--SELECT count() FROM numbers(1000000);
--SELECT 'Ok (5)';

View File

@ -0,0 +1,2 @@
0
1

View File

@ -0,0 +1,15 @@
-- Limit to 10 MB/sec
SET max_network_bandwidth = 10000000;
-- Lower max_block_size, so we can start throttling sooner. Otherwise query will be executed too quickly.
SET max_block_size = 100;
CREATE TEMPORARY TABLE times (t DateTime);
-- rand64 is uncompressable data. Each number will take 8 bytes of bandwidth.
-- This query should execute in no less than 1.6 seconds if throttled.
INSERT INTO times SELECT now();
SELECT sum(ignore(*)) FROM (SELECT rand64() FROM remote('127.0.0.{2,3}', numbers(2000000)));
INSERT INTO times SELECT now();
SELECT max(t) - min(t) >= 1 FROM times;

View File

@ -0,0 +1,19 @@
DROP TABLE IF EXISTS ES;
create table ES(A String) Engine=MergeTree order by tuple();
insert into ES select toString(number) from numbers(10000000);
SET max_execution_time = 100, max_execution_speed = 1000000;
SET max_threads = 1;
SET max_block_size = 1000000;
-- Exception about execution speed is not thrown from these queries.
SELECT * FROM ES LIMIT 1 format Null;
SELECT * FROM ES LIMIT 10 format Null;
SELECT * FROM ES LIMIT 100 format Null;
SELECT * FROM ES LIMIT 1000 format Null;
SELECT * FROM ES LIMIT 10000 format Null;
SELECT * FROM ES LIMIT 100000 format Null;
SELECT * FROM ES LIMIT 1000000 format Null;
DROP TABLE ES;

View File

@ -0,0 +1,50 @@
--- notEmpty
['a'] 2
['a','b','c'] 3
['aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa'] 4
--- empty
[] 1
--- = []
[] 1
--- != []
['a'] 2
['a','b','c'] 3
['aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa'] 4
--- > []
['a'] 2
['a','b','c'] 3
['aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa'] 4
--- < []
--- >= []
[] 1
['a'] 2
['a','b','c'] 3
['aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa'] 4
--- <= []
[] 1
---
--- notEmpty
['a'] 2
['a','b','c'] 3
['aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa'] 4
--- empty
[] 1
--- = []
[] 1
--- != []
['a'] 2
['a','b','c'] 3
['aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa'] 4
--- > []
['a'] 2
['a','b','c'] 3
['aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa'] 4
--- < []
--- >= []
[] 1
['a'] 2
['a','b','c'] 3
['aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa'] 4
--- <= []
[] 1
---

View File

@ -0,0 +1,66 @@
drop table if exists count_lc_test;
CREATE TABLE count_lc_test
(
`s` LowCardinality(String),
`arr` Array(LowCardinality(String)),
`num` UInt64
)
ENGINE = MergeTree
ORDER BY (s, arr);
INSERT INTO count_lc_test(num, arr) VALUES (1,[]),(2,['a']),(3,['a','b','c']),(4,['aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa']);
SELECT '--- notEmpty';
select * from count_lc_test where notEmpty(arr);
SELECT '--- empty';
select * from count_lc_test where empty(arr);
SELECT '--- = []';
select * from count_lc_test where arr = [];
SELECT '--- != []';
select * from count_lc_test where arr != [];
SELECT '--- > []';
select * from count_lc_test where arr > [];
SELECT '--- < []';
select * from count_lc_test where arr < [];
SELECT '--- >= []';
select * from count_lc_test where arr >= [];
SELECT '--- <= []';
select * from count_lc_test where arr <= [];
SELECT '---';
DROP TABLE count_lc_test;
drop table if exists count_lc_test;
CREATE TABLE count_lc_test
(
`s` LowCardinality(String),
`arr` Array(String),
`num` UInt64
)
ENGINE = MergeTree
ORDER BY (s, arr);
INSERT INTO count_lc_test(num, arr) VALUES (1,[]),(2,['a']),(3,['a','b','c']),(4,['aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa']);
SELECT '--- notEmpty';
select * from count_lc_test where notEmpty(arr);
SELECT '--- empty';
select * from count_lc_test where empty(arr);
SELECT '--- = []';
select * from count_lc_test where arr = [];
SELECT '--- != []';
select * from count_lc_test where arr != [];
SELECT '--- > []';
select * from count_lc_test where arr > [];
SELECT '--- < []';
select * from count_lc_test where arr < [];
SELECT '--- >= []';
select * from count_lc_test where arr >= [];
SELECT '--- <= []';
select * from count_lc_test where arr <= [];
SELECT '---';
DROP TABLE count_lc_test;

View File

@ -0,0 +1,3 @@
2000000
1
1

View File

@ -0,0 +1,13 @@
SET max_execution_speed = 1000000, timeout_before_checking_execution_speed = 0.001, max_block_size = 100;
CREATE TEMPORARY TABLE times (t DateTime);
INSERT INTO times SELECT now();
SELECT count('special query for 01290_max_execution_speed_distributed') FROM remote('127.0.0.{2,3}', numbers(1000000));
INSERT INTO times SELECT now();
SELECT max(t) - min(t) >= 1 FROM times;
-- Check that the query was also throttled on "remote" servers.
SYSTEM FLUSH LOGS;
SELECT DISTINCT query_duration_ms >= 500 FROM system.query_log WHERE event_date >= yesterday() AND query LIKE '%special query for 01290_max_execution_speed_distributed%' AND type = 2;

View File

@ -0,0 +1,11 @@
DROP TABLE IF EXISTS data;
DROP TABLE IF EXISTS dist;
create table data (key String) Engine=Memory();
create table dist (key LowCardinality(String)) engine=Distributed(test_cluster_two_shards, currentDatabase(), data);
insert into data values ('foo');
set distributed_aggregation_memory_efficient=1;
select * from dist group by key;
DROP TABLE data;
DROP TABLE dist;

View File

@ -0,0 +1 @@
(0,0) [(0,0),(10,0),(10,10),(0,10)] [[(20,20),(50,20),(50,50),(20,50)],[(30,30),(50,50),(50,30)]] [[[(0,0),(10,0),(10,10),(0,10)]],[[(20,20),(50,20),(50,50),(20,50)],[(30,30),(50,50),(50,30)]]]

View File

@ -0,0 +1,9 @@
DROP TABLE IF EXISTS geo;
SET allow_experimental_geo_types = 1;
CREATE TABLE geo (a Point, b Ring, c Polygon, d MultiPolygon) ENGINE=Memory();
INSERT INTO geo VALUES((0, 0), [(0, 0), (10, 0), (10, 10), (0, 10)], [[(20, 20), (50, 20), (50, 50), (20, 50)], [(30, 30), (50, 50), (50, 30)]], [[[(0, 0), (10, 0), (10, 10), (0, 10)]], [[(20, 20), (50, 20), (50, 50), (20, 50)],[(30, 30), (50, 50), (50, 30)]]]);
SELECT * from geo;

View File

@ -0,0 +1,5 @@
SELECT toIntervalSecond(now64()); -- { serverError 70 }
SELECT CAST(now64() AS IntervalSecond); -- { serverError 70 }
SELECT toIntervalSecond(now64()); -- { serverError 70 }
SELECT CAST(now64() AS IntervalSecond); -- { serverError 70 }

View File

@ -0,0 +1,4 @@
4392010
1
4392010
1

View File

@ -0,0 +1,16 @@
SET max_execution_speed = 4000000, timeout_before_checking_execution_speed = 0.001;
CREATE TEMPORARY TABLE times (t DateTime);
INSERT INTO times SELECT now();
SELECT count() FROM test.hits SAMPLE 1 / 2;
INSERT INTO times SELECT now();
SELECT max(t) - min(t) >= 1 FROM times;
TRUNCATE TABLE times;
INSERT INTO times SELECT now();
SELECT count() FROM merge(test, '^hits$') SAMPLE 1 / 2;
INSERT INTO times SELECT now();
SELECT max(t) - min(t) >= 1 FROM times;

View File

@ -8,5 +8,6 @@ ROOT_PATH=$(git rev-parse --show-toplevel)
EXCLUDE_DIRS='build/|integration/|widechar_width/|glibc-compatibility/|memcpy/|consistent-hashing'
find "${ROOT_PATH}" -name 'ya.make.in' | while read path; do
(cd $(dirname "${path}") && perl -pne 's/<\?(.+?)\?>/`$1`/e' < "${path}" > "${path/.in/}")
echo "# This file is generated automatically, do not edit. See 'ya.make.in' and use 'utils/generate-ya-make' to regenerate it." > "${path/.in/}"
(cd $(dirname "${path}") && perl -pne 's/<\?(.+?)\?>/`$1`/e' < "${path}" >> "${path/.in/}")
done

View File

@ -22,6 +22,9 @@
<meta name="description" content="{{ description }}" />
<meta name="keywords"
content="ClickHouse, DBMS, OLAP, SQL, {{ _('open-source') }}, {{ _('relational') }}, {{ _('analytics') }}, {{ _('analytical') }}, {{ _('Big Data') }}, {{ _('web-analytics') }}" />
{% if config and (config.extra.single_page or config.extra.version_prefix) %}
<meta name="robots" content="noindex,follow" />
{% endif %}
{% if config and page %}
{% for code, name in config.extra.languages.items() %}

View File

@ -7,6 +7,9 @@
<link rel="canonical" href="{{ canonical_url or 'https://clickhouse.tech/' }}" />
<meta name="viewport" content="width=device-width,minimum-scale=1,initial-scale=1">
{% include "templates/docs/ld_json.html" %}
{% if config.extra.single_page or config.extra.version_prefix %}
<meta name="robots" content="noindex,follow" />
{% endif %}
<style amp-boilerplate>body{-webkit-animation:-amp-start 8s steps(1,end) 0s 1 normal both;-moz-animation:-amp-start 8s steps(1,end) 0s 1 normal both;-ms-animation:-amp-start 8s steps(1,end) 0s 1 normal both;animation:-amp-start 8s steps(1,end) 0s 1 normal both}@-webkit-keyframes -amp-start{from{visibility:hidden}to{visibility:visible}}@-moz-keyframes -amp-start{from{visibility:hidden}to{visibility:visible}}@-ms-keyframes -amp-start{from{visibility:hidden}to{visibility:visible}}@-o-keyframes -amp-start{from{visibility:hidden}to{visibility:visible}}@keyframes -amp-start{from{visibility:hidden}to{visibility:visible}}</style><noscript><style amp-boilerplate>body{-webkit-animation:none;-moz-animation:none;-ms-animation:none;animation:none}</style></noscript>
<style amp-custom>CUSTOM_CSS_PLACEHOLDER</style>
<script async custom-element="amp-analytics" src="https://cdn.ampproject.org/v0/amp-analytics-0.1.js"></script>