mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-24 00:22:29 +00:00
commit
18a72fa91a
@ -365,7 +365,7 @@ private:
|
||||
Stopwatch watch;
|
||||
RemoteBlockInputStream stream(
|
||||
*(*connection_entries[connection_index]),
|
||||
query, {}, global_context, &settings, nullptr, Tables(), query_processing_stage);
|
||||
query, {}, global_context, &settings, nullptr, Scalars(), Tables(), query_processing_stage);
|
||||
|
||||
Progress progress;
|
||||
stream.setProgressCallback([&progress](const Progress & value) { progress.incrementPiecewiseAtomically(value); });
|
||||
|
@ -850,9 +850,10 @@ bool TCPHandler::receivePacket()
|
||||
return true;
|
||||
|
||||
case Protocol::Client::Data:
|
||||
case Protocol::Client::Scalar:
|
||||
if (state.empty())
|
||||
receiveUnexpectedData();
|
||||
return receiveData();
|
||||
return receiveData(packet_type == Protocol::Client::Scalar);
|
||||
|
||||
case Protocol::Client::Ping:
|
||||
writeVarUInt(Protocol::Server::Pong, *out);
|
||||
@ -957,39 +958,44 @@ void TCPHandler::receiveUnexpectedQuery()
|
||||
throw NetException("Unexpected packet Query received from client", ErrorCodes::UNEXPECTED_PACKET_FROM_CLIENT);
|
||||
}
|
||||
|
||||
bool TCPHandler::receiveData()
|
||||
bool TCPHandler::receiveData(bool scalar)
|
||||
{
|
||||
initBlockInput();
|
||||
|
||||
/// The name of the temporary table for writing data, default to empty string
|
||||
String external_table_name;
|
||||
readStringBinary(external_table_name, *in);
|
||||
String name;
|
||||
readStringBinary(name, *in);
|
||||
|
||||
/// Read one block from the network and write it down
|
||||
Block block = state.block_in->read();
|
||||
|
||||
if (block)
|
||||
{
|
||||
/// If there is an insert request, then the data should be written directly to `state.io.out`.
|
||||
/// Otherwise, we write the blocks in the temporary `external_table_name` table.
|
||||
if (!state.need_receive_data_for_insert && !state.need_receive_data_for_input)
|
||||
{
|
||||
StoragePtr storage;
|
||||
/// If such a table does not exist, create it.
|
||||
if (!(storage = query_context->tryGetExternalTable(external_table_name)))
|
||||
{
|
||||
NamesAndTypesList columns = block.getNamesAndTypesList();
|
||||
storage = StorageMemory::create("_external", external_table_name, ColumnsDescription{columns}, ConstraintsDescription{});
|
||||
storage->startup();
|
||||
query_context->addExternalTable(external_table_name, storage);
|
||||
}
|
||||
/// The data will be written directly to the table.
|
||||
state.io.out = storage->write(ASTPtr(), *query_context);
|
||||
}
|
||||
if (state.need_receive_data_for_input)
|
||||
state.block_for_input = block;
|
||||
if (scalar)
|
||||
query_context->addScalar(name, block);
|
||||
else
|
||||
state.io.out->write(block);
|
||||
{
|
||||
/// If there is an insert request, then the data should be written directly to `state.io.out`.
|
||||
/// Otherwise, we write the blocks in the temporary `external_table_name` table.
|
||||
if (!state.need_receive_data_for_insert && !state.need_receive_data_for_input)
|
||||
{
|
||||
StoragePtr storage;
|
||||
/// If such a table does not exist, create it.
|
||||
if (!(storage = query_context->tryGetExternalTable(name)))
|
||||
{
|
||||
NamesAndTypesList columns = block.getNamesAndTypesList();
|
||||
storage = StorageMemory::create("_external", name, ColumnsDescription{columns}, ConstraintsDescription{});
|
||||
storage->startup();
|
||||
query_context->addExternalTable(name, storage);
|
||||
}
|
||||
/// The data will be written directly to the table.
|
||||
state.io.out = storage->write(ASTPtr(), *query_context);
|
||||
}
|
||||
if (state.need_receive_data_for_input)
|
||||
state.block_for_input = block;
|
||||
else
|
||||
state.io.out->write(block);
|
||||
}
|
||||
return true;
|
||||
}
|
||||
else
|
||||
|
@ -153,7 +153,7 @@ private:
|
||||
void receiveHello();
|
||||
bool receivePacket();
|
||||
void receiveQuery();
|
||||
bool receiveData();
|
||||
bool receiveData(bool scalar);
|
||||
bool readDataNext(const size_t & poll_interval, const int & receive_timeout);
|
||||
void readData(const Settings & global_settings);
|
||||
std::tuple<size_t, int> getReadTimeouts(const Settings & global_settings);
|
||||
|
@ -30,6 +30,7 @@
|
||||
|
||||
namespace CurrentMetrics
|
||||
{
|
||||
extern const Metric SendScalars;
|
||||
extern const Metric SendExternalTables;
|
||||
}
|
||||
|
||||
@ -441,7 +442,7 @@ void Connection::sendCancel()
|
||||
}
|
||||
|
||||
|
||||
void Connection::sendData(const Block & block, const String & name)
|
||||
void Connection::sendData(const Block & block, const String & name, bool scalar)
|
||||
{
|
||||
//LOG_TRACE(log_wrapper.get(), "Sending data");
|
||||
|
||||
@ -455,7 +456,10 @@ void Connection::sendData(const Block & block, const String & name)
|
||||
block_out = std::make_shared<NativeBlockOutputStream>(*maybe_compressed_out, server_revision, block.cloneEmpty());
|
||||
}
|
||||
|
||||
writeVarUInt(Protocol::Client::Data, *out);
|
||||
if (scalar)
|
||||
writeVarUInt(Protocol::Client::Scalar, *out);
|
||||
else
|
||||
writeVarUInt(Protocol::Client::Data, *out);
|
||||
writeStringBinary(name, *out);
|
||||
|
||||
size_t prev_bytes = out->count();
|
||||
@ -484,6 +488,44 @@ void Connection::sendPreparedData(ReadBuffer & input, size_t size, const String
|
||||
}
|
||||
|
||||
|
||||
void Connection::sendScalarsData(Scalars & data)
|
||||
{
|
||||
if (data.empty())
|
||||
return;
|
||||
|
||||
Stopwatch watch;
|
||||
size_t out_bytes = out ? out->count() : 0;
|
||||
size_t maybe_compressed_out_bytes = maybe_compressed_out ? maybe_compressed_out->count() : 0;
|
||||
size_t rows = 0;
|
||||
|
||||
CurrentMetrics::Increment metric_increment{CurrentMetrics::SendScalars};
|
||||
|
||||
for (auto & elem : data)
|
||||
{
|
||||
rows += elem.second.rows();
|
||||
sendData(elem.second, elem.first, true /* scalar */);
|
||||
}
|
||||
|
||||
out_bytes = out->count() - out_bytes;
|
||||
maybe_compressed_out_bytes = maybe_compressed_out->count() - maybe_compressed_out_bytes;
|
||||
double elapsed = watch.elapsedSeconds();
|
||||
|
||||
std::stringstream msg;
|
||||
msg << std::fixed << std::setprecision(3);
|
||||
msg << "Sent data for " << data.size() << " scalars, total " << rows << " rows in " << elapsed << " sec., "
|
||||
<< static_cast<size_t>(rows / watch.elapsedSeconds()) << " rows/sec., "
|
||||
<< maybe_compressed_out_bytes / 1048576.0 << " MiB (" << maybe_compressed_out_bytes / 1048576.0 / watch.elapsedSeconds() << " MiB/sec.)";
|
||||
|
||||
if (compression == Protocol::Compression::Enable)
|
||||
msg << ", compressed " << static_cast<double>(maybe_compressed_out_bytes) / out_bytes << " times to "
|
||||
<< out_bytes / 1048576.0 << " MiB (" << out_bytes / 1048576.0 / watch.elapsedSeconds() << " MiB/sec.)";
|
||||
else
|
||||
msg << ", no compression.";
|
||||
|
||||
LOG_DEBUG(log_wrapper.get(), msg.rdbuf());
|
||||
}
|
||||
|
||||
|
||||
void Connection::sendExternalTablesData(ExternalTablesData & data)
|
||||
{
|
||||
if (data.empty())
|
||||
|
@ -133,7 +133,9 @@ public:
|
||||
|
||||
void sendCancel();
|
||||
/// Send block of data; if name is specified, server will write it to external (temporary) table of that name.
|
||||
void sendData(const Block & block, const String & name = "");
|
||||
void sendData(const Block & block, const String & name = "", bool scalar = false);
|
||||
/// Send all scalars.
|
||||
void sendScalarsData(Scalars & data);
|
||||
/// Send all contents of external (temporary) tables.
|
||||
void sendExternalTablesData(ExternalTablesData & data);
|
||||
|
||||
|
@ -51,6 +51,21 @@ MultiplexedConnections::MultiplexedConnections(
|
||||
active_connection_count = connections.size();
|
||||
}
|
||||
|
||||
void MultiplexedConnections::sendScalarsData(Scalars & data)
|
||||
{
|
||||
std::lock_guard lock(cancel_mutex);
|
||||
|
||||
if (!sent_query)
|
||||
throw Exception("Cannot send scalars data: query not yet sent.", ErrorCodes::LOGICAL_ERROR);
|
||||
|
||||
for (ReplicaState & state : replica_states)
|
||||
{
|
||||
Connection * connection = state.connection;
|
||||
if (connection != nullptr)
|
||||
connection->sendScalarsData(data);
|
||||
}
|
||||
}
|
||||
|
||||
void MultiplexedConnections::sendExternalTablesData(std::vector<ExternalTablesData> & data)
|
||||
{
|
||||
std::lock_guard lock(cancel_mutex);
|
||||
|
@ -27,6 +27,8 @@ public:
|
||||
std::vector<IConnectionPool::Entry> && connections,
|
||||
const Settings & settings_, const ThrottlerPtr & throttler_);
|
||||
|
||||
/// Send all scalars to replicas.
|
||||
void sendScalarsData(Scalars & data);
|
||||
/// Send all content of external tables to replicas.
|
||||
void sendExternalTablesData(std::vector<ExternalTablesData> & data);
|
||||
|
||||
|
@ -21,6 +21,7 @@
|
||||
M(OpenFileForWrite, "Number of files open for writing") \
|
||||
M(Read, "Number of read (read, pread, io_getevents, etc.) syscalls in fly") \
|
||||
M(Write, "Number of write (write, pwrite, io_getevents, etc.) syscalls in fly") \
|
||||
M(SendScalars, "Number of connections that are sending data for scalars to remote servers.") \
|
||||
M(SendExternalTables, "Number of connections that are sending data for external tables to remote servers. External tables are used to implement GLOBAL IN and GLOBAL JOIN operators with distributed subqueries.") \
|
||||
M(QueryThread, "Number of query processing threads") \
|
||||
M(ReadonlyReplica, "Number of Replicated tables that are currently in readonly state due to re-initialization after ZooKeeper session loss or due to startup without ZooKeeper configured.") \
|
||||
|
@ -459,6 +459,8 @@ namespace ErrorCodes
|
||||
extern const int DICTIONARY_ACCESS_DENIED = 482;
|
||||
extern const int TOO_MANY_REDIRECTS = 483;
|
||||
extern const int INTERNAL_REDIS_ERROR = 484;
|
||||
extern const int SCALAR_ALREADY_EXISTS = 485;
|
||||
extern const int UNKNOWN_SCALAR = 486;
|
||||
|
||||
extern const int KEEPER_EXCEPTION = 999;
|
||||
extern const int POCO_EXCEPTION = 1000;
|
||||
|
@ -112,7 +112,8 @@ namespace Protocol
|
||||
Cancel = 3, /// Cancel the query execution.
|
||||
Ping = 4, /// Check that connection to the server is alive.
|
||||
TablesStatusRequest = 5, /// Check status of tables on the server.
|
||||
KeepAlive = 6 /// Keep the connection alive
|
||||
KeepAlive = 6, /// Keep the connection alive
|
||||
Scalar = 7 /// A block of data (compressed or not).
|
||||
};
|
||||
|
||||
inline const char * toString(UInt64 packet)
|
||||
|
@ -379,6 +379,8 @@ struct Settings : public SettingsCollection<Settings>
|
||||
M(SettingUInt64, max_live_view_insert_blocks_before_refresh, 64, "Limit maximum number of inserted blocks after which mergeable blocks are dropped and query is re-executed.") \
|
||||
M(SettingUInt64, min_free_disk_space_for_temporary_data, 0, "The minimum disk space to keep while writing temporary data used in external sorting and aggregation.") \
|
||||
\
|
||||
M(SettingBool, enable_scalar_subquery_optimization, true, "If it is set to true, prevent scalar subqueries from (de)serializing large scalar values and possibly avoid running the same subquery more than once.") \
|
||||
\
|
||||
/** Obsolete settings that do nothing but left for compatibility reasons. Remove each one after half a year of obsolescence. */ \
|
||||
\
|
||||
M(SettingBool, allow_experimental_low_cardinality_type, true, "Obsolete setting, does nothing. Will be removed after 2019-08-13") \
|
||||
|
@ -23,8 +23,8 @@ namespace ErrorCodes
|
||||
RemoteBlockInputStream::RemoteBlockInputStream(
|
||||
Connection & connection,
|
||||
const String & query_, const Block & header_, const Context & context_, const Settings * settings,
|
||||
const ThrottlerPtr & throttler, const Tables & external_tables_, QueryProcessingStage::Enum stage_)
|
||||
: header(header_), query(query_), context(context_), external_tables(external_tables_), stage(stage_)
|
||||
const ThrottlerPtr & throttler, const Scalars & scalars_, const Tables & external_tables_, QueryProcessingStage::Enum stage_)
|
||||
: header(header_), query(query_), context(context_), scalars(scalars_), external_tables(external_tables_), stage(stage_)
|
||||
{
|
||||
if (settings)
|
||||
context.setSettings(*settings);
|
||||
@ -38,8 +38,8 @@ RemoteBlockInputStream::RemoteBlockInputStream(
|
||||
RemoteBlockInputStream::RemoteBlockInputStream(
|
||||
std::vector<IConnectionPool::Entry> && connections,
|
||||
const String & query_, const Block & header_, const Context & context_, const Settings * settings,
|
||||
const ThrottlerPtr & throttler, const Tables & external_tables_, QueryProcessingStage::Enum stage_)
|
||||
: header(header_), query(query_), context(context_), external_tables(external_tables_), stage(stage_)
|
||||
const ThrottlerPtr & throttler, const Scalars & scalars_, const Tables & external_tables_, QueryProcessingStage::Enum stage_)
|
||||
: header(header_), query(query_), context(context_), scalars(scalars_), external_tables(external_tables_), stage(stage_)
|
||||
{
|
||||
if (settings)
|
||||
context.setSettings(*settings);
|
||||
@ -54,8 +54,8 @@ RemoteBlockInputStream::RemoteBlockInputStream(
|
||||
RemoteBlockInputStream::RemoteBlockInputStream(
|
||||
const ConnectionPoolWithFailoverPtr & pool,
|
||||
const String & query_, const Block & header_, const Context & context_, const Settings * settings,
|
||||
const ThrottlerPtr & throttler, const Tables & external_tables_, QueryProcessingStage::Enum stage_)
|
||||
: header(header_), query(query_), context(context_), external_tables(external_tables_), stage(stage_)
|
||||
const ThrottlerPtr & throttler, const Scalars & scalars_, const Tables & external_tables_, QueryProcessingStage::Enum stage_)
|
||||
: header(header_), query(query_), context(context_), scalars(scalars_), external_tables(external_tables_), stage(stage_)
|
||||
{
|
||||
if (settings)
|
||||
context.setSettings(*settings);
|
||||
@ -120,6 +120,11 @@ void RemoteBlockInputStream::cancel(bool kill)
|
||||
tryCancel("Cancelling query");
|
||||
}
|
||||
|
||||
void RemoteBlockInputStream::sendScalars()
|
||||
{
|
||||
multiplexed_connections->sendScalarsData(scalars);
|
||||
}
|
||||
|
||||
void RemoteBlockInputStream::sendExternalTables()
|
||||
{
|
||||
size_t count = multiplexed_connections->size();
|
||||
@ -308,6 +313,8 @@ void RemoteBlockInputStream::sendQuery()
|
||||
established = false;
|
||||
sent_query = true;
|
||||
|
||||
if (settings.enable_scalar_subquery_optimization)
|
||||
sendScalars();
|
||||
sendExternalTables();
|
||||
}
|
||||
|
||||
|
@ -25,7 +25,7 @@ public:
|
||||
RemoteBlockInputStream(
|
||||
Connection & connection,
|
||||
const String & query_, const Block & header_, const Context & context_, const Settings * settings = nullptr,
|
||||
const ThrottlerPtr & throttler = nullptr, const Tables & external_tables_ = Tables(),
|
||||
const ThrottlerPtr & throttler = nullptr, const Scalars & scalars_ = Scalars(), const Tables & external_tables_ = Tables(),
|
||||
QueryProcessingStage::Enum stage_ = QueryProcessingStage::Complete);
|
||||
|
||||
/// Accepts several connections already taken from pool.
|
||||
@ -33,7 +33,7 @@ public:
|
||||
RemoteBlockInputStream(
|
||||
std::vector<IConnectionPool::Entry> && connections,
|
||||
const String & query_, const Block & header_, const Context & context_, const Settings * settings = nullptr,
|
||||
const ThrottlerPtr & throttler = nullptr, const Tables & external_tables_ = Tables(),
|
||||
const ThrottlerPtr & throttler = nullptr, const Scalars & scalars_ = Scalars(), const Tables & external_tables_ = Tables(),
|
||||
QueryProcessingStage::Enum stage_ = QueryProcessingStage::Complete);
|
||||
|
||||
/// Takes a pool and gets one or several connections from it.
|
||||
@ -41,7 +41,7 @@ public:
|
||||
RemoteBlockInputStream(
|
||||
const ConnectionPoolWithFailoverPtr & pool,
|
||||
const String & query_, const Block & header_, const Context & context_, const Settings * settings = nullptr,
|
||||
const ThrottlerPtr & throttler = nullptr, const Tables & external_tables_ = Tables(),
|
||||
const ThrottlerPtr & throttler = nullptr, const Scalars & scalars_ = Scalars(), const Tables & external_tables_ = Tables(),
|
||||
QueryProcessingStage::Enum stage_ = QueryProcessingStage::Complete);
|
||||
|
||||
~RemoteBlockInputStream() override;
|
||||
@ -71,6 +71,9 @@ public:
|
||||
Block getHeader() const override { return header; }
|
||||
|
||||
protected:
|
||||
/// Send all scalars to remote servers
|
||||
void sendScalars();
|
||||
|
||||
/// Send all temporary tables to remote servers
|
||||
void sendExternalTables();
|
||||
|
||||
@ -103,6 +106,8 @@ private:
|
||||
String query_id = "";
|
||||
Context context;
|
||||
|
||||
/// Scalars needed to be sent to remote servers
|
||||
Scalars scalars;
|
||||
/// Temporary tables needed to be sent to remote servers
|
||||
Tables external_tables;
|
||||
QueryProcessingStage::Enum stage;
|
||||
|
68
dbms/src/Functions/getScalar.cpp
Normal file
68
dbms/src/Functions/getScalar.cpp
Normal file
@ -0,0 +1,68 @@
|
||||
#include <Functions/IFunction.h>
|
||||
#include <Functions/FunctionFactory.h>
|
||||
#include <Functions/FunctionHelpers.h>
|
||||
#include <DataTypes/DataTypeString.h>
|
||||
#include <Columns/ColumnString.h>
|
||||
#include <Interpreters/Context.h>
|
||||
#include <Common/Macros.h>
|
||||
#include <Core/Field.h>
|
||||
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
namespace ErrorCodes
|
||||
{
|
||||
extern const int ILLEGAL_TYPE_OF_ARGUMENT;
|
||||
extern const int ILLEGAL_COLUMN;
|
||||
}
|
||||
|
||||
/** Get scalar value of sub queries from query context via IAST::Hash.
|
||||
*/
|
||||
class FunctionGetScalar : public IFunction
|
||||
{
|
||||
public:
|
||||
static constexpr auto name = "__getScalar";
|
||||
static FunctionPtr create(const Context & context)
|
||||
{
|
||||
return std::make_shared<FunctionGetScalar>(context);
|
||||
}
|
||||
|
||||
FunctionGetScalar(const Context & context_) : context(context_) {}
|
||||
|
||||
String getName() const override
|
||||
{
|
||||
return name;
|
||||
}
|
||||
|
||||
size_t getNumberOfArguments() const override
|
||||
{
|
||||
return 1;
|
||||
}
|
||||
|
||||
DataTypePtr getReturnTypeImpl(const ColumnsWithTypeAndName & arguments) const override
|
||||
{
|
||||
if (arguments.size() != 1 || !isString(arguments[0].type) || !isColumnConst(*arguments[0].column))
|
||||
throw Exception("Function " + getName() + " accepts one const string argument", ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
|
||||
auto scalar_name = assert_cast<const ColumnConst &>(*arguments[0].column).getField().get<String>();
|
||||
scalar = context.getScalar(scalar_name).getByPosition(0);
|
||||
return scalar.type;
|
||||
}
|
||||
|
||||
void executeImpl(Block & block, const ColumnNumbers &, size_t result, size_t input_rows_count) override
|
||||
{
|
||||
block.getByPosition(result).column = ColumnConst::create(scalar.column, input_rows_count);
|
||||
}
|
||||
|
||||
private:
|
||||
mutable ColumnWithTypeAndName scalar;
|
||||
const Context & context;
|
||||
};
|
||||
|
||||
|
||||
void registerFunctionGetScalar(FunctionFactory & factory)
|
||||
{
|
||||
factory.registerFunction<FunctionGetScalar>();
|
||||
}
|
||||
|
||||
}
|
@ -52,6 +52,7 @@ void registerFunctionEvalMLMethod(FunctionFactory &);
|
||||
void registerFunctionBasename(FunctionFactory &);
|
||||
void registerFunctionTransform(FunctionFactory &);
|
||||
void registerFunctionGetMacro(FunctionFactory &);
|
||||
void registerFunctionGetScalar(FunctionFactory &);
|
||||
|
||||
#if USE_ICU
|
||||
void registerFunctionConvertCharset(FunctionFactory &);
|
||||
@ -106,6 +107,7 @@ void registerFunctionsMiscellaneous(FunctionFactory & factory)
|
||||
registerFunctionBasename(factory);
|
||||
registerFunctionTransform(factory);
|
||||
registerFunctionGetMacro(factory);
|
||||
registerFunctionGetScalar(factory);
|
||||
|
||||
#if USE_ICU
|
||||
registerFunctionConvertCharset(factory);
|
||||
|
@ -33,11 +33,13 @@ SelectStreamFactory::SelectStreamFactory(
|
||||
const Block & header_,
|
||||
QueryProcessingStage::Enum processed_stage_,
|
||||
QualifiedTableName main_table_,
|
||||
const Scalars & scalars_,
|
||||
const Tables & external_tables_)
|
||||
: header(header_),
|
||||
processed_stage{processed_stage_},
|
||||
main_table(std::move(main_table_)),
|
||||
table_func_ptr{nullptr},
|
||||
scalars{scalars_},
|
||||
external_tables{external_tables_}
|
||||
{
|
||||
}
|
||||
@ -46,10 +48,12 @@ SelectStreamFactory::SelectStreamFactory(
|
||||
const Block & header_,
|
||||
QueryProcessingStage::Enum processed_stage_,
|
||||
ASTPtr table_func_ptr_,
|
||||
const Scalars & scalars_,
|
||||
const Tables & external_tables_)
|
||||
: header(header_),
|
||||
processed_stage{processed_stage_},
|
||||
table_func_ptr{table_func_ptr_},
|
||||
scalars{scalars_},
|
||||
external_tables{external_tables_}
|
||||
{
|
||||
}
|
||||
@ -92,7 +96,8 @@ void SelectStreamFactory::createForShard(
|
||||
|
||||
auto emplace_remote_stream = [&]()
|
||||
{
|
||||
auto stream = std::make_shared<RemoteBlockInputStream>(shard_info.pool, query, header, context, nullptr, throttler, external_tables, processed_stage);
|
||||
auto stream = std::make_shared<RemoteBlockInputStream>(
|
||||
shard_info.pool, query, header, context, nullptr, throttler, scalars, external_tables, processed_stage);
|
||||
stream->setPoolMode(PoolMode::GET_MANY);
|
||||
if (!table_func_ptr)
|
||||
stream->setMainTable(main_table);
|
||||
@ -190,8 +195,8 @@ void SelectStreamFactory::createForShard(
|
||||
|
||||
auto lazily_create_stream = [
|
||||
pool = shard_info.pool, shard_num = shard_info.shard_num, query, header = header, query_ast, context, throttler,
|
||||
main_table = main_table, table_func_ptr = table_func_ptr, external_tables = external_tables, stage = processed_stage,
|
||||
local_delay]()
|
||||
main_table = main_table, table_func_ptr = table_func_ptr, scalars = scalars, external_tables = external_tables,
|
||||
stage = processed_stage, local_delay]()
|
||||
-> BlockInputStreamPtr
|
||||
{
|
||||
auto current_settings = context.getSettingsRef();
|
||||
@ -233,7 +238,7 @@ void SelectStreamFactory::createForShard(
|
||||
connections.emplace_back(std::move(try_result.entry));
|
||||
|
||||
return std::make_shared<RemoteBlockInputStream>(
|
||||
std::move(connections), query, header, context, nullptr, throttler, external_tables, stage);
|
||||
std::move(connections), query, header, context, nullptr, throttler, scalars, external_tables, stage);
|
||||
}
|
||||
};
|
||||
|
||||
|
@ -18,6 +18,7 @@ public:
|
||||
const Block & header_,
|
||||
QueryProcessingStage::Enum processed_stage_,
|
||||
QualifiedTableName main_table_,
|
||||
const Scalars & scalars_,
|
||||
const Tables & external_tables);
|
||||
|
||||
/// TableFunction in a query.
|
||||
@ -25,6 +26,7 @@ public:
|
||||
const Block & header_,
|
||||
QueryProcessingStage::Enum processed_stage_,
|
||||
ASTPtr table_func_ptr_,
|
||||
const Scalars & scalars_,
|
||||
const Tables & external_tables_);
|
||||
|
||||
void createForShard(
|
||||
@ -38,6 +40,7 @@ private:
|
||||
QueryProcessingStage::Enum processed_stage;
|
||||
QualifiedTableName main_table;
|
||||
ASTPtr table_func_ptr;
|
||||
Scalars scalars;
|
||||
Tables external_tables;
|
||||
};
|
||||
|
||||
|
@ -88,6 +88,8 @@ namespace ErrorCodes
|
||||
extern const int SESSION_IS_LOCKED;
|
||||
extern const int CANNOT_GET_CREATE_TABLE_QUERY;
|
||||
extern const int LOGICAL_ERROR;
|
||||
extern const int SCALAR_ALREADY_EXISTS;
|
||||
extern const int UNKNOWN_SCALAR;
|
||||
}
|
||||
|
||||
|
||||
@ -862,6 +864,21 @@ void Context::assertDatabaseDoesntExist(const String & database_name) const
|
||||
}
|
||||
|
||||
|
||||
const Scalars & Context::getScalars() const
|
||||
{
|
||||
return scalars;
|
||||
}
|
||||
|
||||
|
||||
const Block & Context::getScalar(const String & name) const
|
||||
{
|
||||
auto it = scalars.find(name);
|
||||
if (scalars.end() == it)
|
||||
throw Exception("Scalar " + backQuoteIfNeed(name) + " doesn't exist (internal bug)", ErrorCodes::UNKNOWN_SCALAR);
|
||||
return it->second;
|
||||
}
|
||||
|
||||
|
||||
Tables Context::getExternalTables() const
|
||||
{
|
||||
auto lock = getLock();
|
||||
@ -959,6 +976,19 @@ void Context::addExternalTable(const String & table_name, const StoragePtr & sto
|
||||
external_tables[table_name] = std::pair(storage, ast);
|
||||
}
|
||||
|
||||
|
||||
void Context::addScalar(const String & name, const Block & block)
|
||||
{
|
||||
scalars[name] = block;
|
||||
}
|
||||
|
||||
|
||||
bool Context::hasScalar(const String & name) const
|
||||
{
|
||||
return scalars.count(name);
|
||||
}
|
||||
|
||||
|
||||
StoragePtr Context::tryRemoveExternalTable(const String & table_name)
|
||||
{
|
||||
TableAndCreateASTs::const_iterator it = external_tables.find(table_name);
|
||||
|
@ -105,6 +105,9 @@ using InputInitializer = std::function<void(Context &, const StoragePtr &)>;
|
||||
/// Callback for reading blocks of data from client for function input()
|
||||
using InputBlocksReader = std::function<Block(Context &)>;
|
||||
|
||||
/// Scalar results of sub queries
|
||||
using Scalars = std::map<String, Block>;
|
||||
|
||||
/// An empty interface for an arbitrary object that may be attached by a shared pointer
|
||||
/// to query context, when using ClickHouse as a library.
|
||||
struct IHostContext
|
||||
@ -144,6 +147,7 @@ private:
|
||||
String default_format; /// Format, used when server formats data by itself and if query does not have FORMAT specification.
|
||||
/// Thus, used in HTTP interface. If not specified - then some globally default format is used.
|
||||
TableAndCreateASTs external_tables; /// Temporary tables.
|
||||
Scalars scalars;
|
||||
StoragePtr view_source; /// Temporary StorageValues used to generate alias columns for materialized views
|
||||
Tables table_function_results; /// Temporary tables obtained by execution of table functions. Keyed by AST tree id.
|
||||
Context * query_context = nullptr;
|
||||
@ -264,11 +268,15 @@ public:
|
||||
void assertDatabaseDoesntExist(const String & database_name) const;
|
||||
void checkDatabaseAccessRights(const std::string & database_name) const;
|
||||
|
||||
const Scalars & getScalars() const;
|
||||
const Block & getScalar(const String & name) const;
|
||||
Tables getExternalTables() const;
|
||||
StoragePtr tryGetExternalTable(const String & table_name) const;
|
||||
StoragePtr getTable(const String & database_name, const String & table_name) const;
|
||||
StoragePtr tryGetTable(const String & database_name, const String & table_name) const;
|
||||
void addExternalTable(const String & table_name, const StoragePtr & storage, const ASTPtr & ast = {});
|
||||
void addScalar(const String & name, const Block & block);
|
||||
bool hasScalar(const String & name) const;
|
||||
StoragePtr tryRemoveExternalTable(const String & table_name);
|
||||
|
||||
StoragePtr executeTableFunction(const ASTPtr & table_expression);
|
||||
|
@ -12,8 +12,11 @@
|
||||
#include <Interpreters/addTypeConversionToAST.h>
|
||||
|
||||
#include <DataStreams/IBlockInputStream.h>
|
||||
#include <DataStreams/materializeBlock.h>
|
||||
#include <DataTypes/DataTypeAggregateFunction.h>
|
||||
#include <DataTypes/DataTypeTuple.h>
|
||||
|
||||
#include <Columns/ColumnTuple.h>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
@ -53,69 +56,98 @@ void ExecuteScalarSubqueriesMatcher::visit(ASTPtr & ast, Data & data)
|
||||
visit(*t, ast, data);
|
||||
}
|
||||
|
||||
/// Converting to literal values might take a fair amount of overhead when the value is large, (e.g.
|
||||
/// Array, BitMap, etc.), This conversion is required for constant folding, index lookup, branch
|
||||
/// elimination. However, these optimizations should never be related to large values, thus we
|
||||
/// blacklist them here.
|
||||
static bool worthConvertingToLiteral(const Block & scalar)
|
||||
{
|
||||
auto scalar_type_name = scalar.safeGetByPosition(0).type->getFamilyName();
|
||||
std::set<String> useless_literal_types = {"Array", "Tuple", "AggregateFunction", "Function", "Set", "LowCardinality"};
|
||||
return !useless_literal_types.count(scalar_type_name);
|
||||
}
|
||||
|
||||
void ExecuteScalarSubqueriesMatcher::visit(const ASTSubquery & subquery, ASTPtr & ast, Data & data)
|
||||
{
|
||||
Context subquery_context = data.context;
|
||||
Settings subquery_settings = data.context.getSettings();
|
||||
subquery_settings.max_result_rows = 1;
|
||||
subquery_settings.extremes = 0;
|
||||
subquery_context.setSettings(subquery_settings);
|
||||
auto hash = subquery.getTreeHash();
|
||||
auto scalar_query_hash_str = toString(hash.first) + "_" + toString(hash.second);
|
||||
|
||||
ASTPtr subquery_select = subquery.children.at(0);
|
||||
BlockIO res = InterpreterSelectWithUnionQuery(
|
||||
subquery_select, subquery_context, SelectQueryOptions(QueryProcessingStage::Complete, data.subquery_depth + 1)).execute();
|
||||
|
||||
Block block;
|
||||
try
|
||||
Block scalar;
|
||||
if (data.context.hasQueryContext() && data.context.getQueryContext().hasScalar(scalar_query_hash_str))
|
||||
scalar = data.context.getQueryContext().getScalar(scalar_query_hash_str);
|
||||
else if (data.scalars.count(scalar_query_hash_str))
|
||||
scalar = data.scalars[scalar_query_hash_str];
|
||||
else
|
||||
{
|
||||
block = res.in->read();
|
||||
Context subquery_context = data.context;
|
||||
Settings subquery_settings = data.context.getSettings();
|
||||
subquery_settings.max_result_rows = 1;
|
||||
subquery_settings.extremes = 0;
|
||||
subquery_context.setSettings(subquery_settings);
|
||||
|
||||
if (!block)
|
||||
ASTPtr subquery_select = subquery.children.at(0);
|
||||
BlockIO res = InterpreterSelectWithUnionQuery(
|
||||
subquery_select, subquery_context, SelectQueryOptions(QueryProcessingStage::Complete, data.subquery_depth + 1)).execute();
|
||||
|
||||
Block block;
|
||||
try
|
||||
{
|
||||
/// Interpret subquery with empty result as Null literal
|
||||
auto ast_new = std::make_unique<ASTLiteral>(Null());
|
||||
ast_new->setAlias(ast->tryGetAlias());
|
||||
ast = std::move(ast_new);
|
||||
return;
|
||||
block = res.in->read();
|
||||
|
||||
if (!block)
|
||||
{
|
||||
/// Interpret subquery with empty result as Null literal
|
||||
auto ast_new = std::make_unique<ASTLiteral>(Null());
|
||||
ast_new->setAlias(ast->tryGetAlias());
|
||||
ast = std::move(ast_new);
|
||||
return;
|
||||
}
|
||||
|
||||
if (block.rows() != 1 || res.in->read())
|
||||
throw Exception("Scalar subquery returned more than one row", ErrorCodes::INCORRECT_RESULT_OF_SCALAR_SUBQUERY);
|
||||
}
|
||||
catch (const Exception & e)
|
||||
{
|
||||
if (e.code() == ErrorCodes::TOO_MANY_ROWS)
|
||||
throw Exception("Scalar subquery returned more than one row", ErrorCodes::INCORRECT_RESULT_OF_SCALAR_SUBQUERY);
|
||||
else
|
||||
throw;
|
||||
}
|
||||
|
||||
if (block.rows() != 1 || res.in->read())
|
||||
throw Exception("Scalar subquery returned more than one row", ErrorCodes::INCORRECT_RESULT_OF_SCALAR_SUBQUERY);
|
||||
}
|
||||
catch (const Exception & e)
|
||||
{
|
||||
if (e.code() == ErrorCodes::TOO_MANY_ROWS)
|
||||
throw Exception("Scalar subquery returned more than one row", ErrorCodes::INCORRECT_RESULT_OF_SCALAR_SUBQUERY);
|
||||
block = materializeBlock(block);
|
||||
size_t columns = block.columns();
|
||||
|
||||
if (columns == 1)
|
||||
scalar = block;
|
||||
else
|
||||
throw;
|
||||
{
|
||||
|
||||
ColumnWithTypeAndName ctn;
|
||||
ctn.type = std::make_shared<DataTypeTuple>(block.getDataTypes());
|
||||
ctn.column = ColumnTuple::create(block.getColumns());
|
||||
scalar.insert(ctn);
|
||||
}
|
||||
}
|
||||
|
||||
size_t columns = block.columns();
|
||||
if (columns == 1)
|
||||
const Settings & settings = data.context.getSettingsRef();
|
||||
|
||||
// Always convert to literals when there is no query context.
|
||||
if (!settings.enable_scalar_subquery_optimization || worthConvertingToLiteral(scalar) || !data.context.hasQueryContext())
|
||||
{
|
||||
auto lit = std::make_unique<ASTLiteral>((*block.safeGetByPosition(0).column)[0]);
|
||||
auto lit = std::make_unique<ASTLiteral>((*scalar.safeGetByPosition(0).column)[0]);
|
||||
lit->alias = subquery.alias;
|
||||
lit->prefer_alias_to_column_name = subquery.prefer_alias_to_column_name;
|
||||
ast = addTypeConversionToAST(std::move(lit), block.safeGetByPosition(0).type->getName());
|
||||
ast = addTypeConversionToAST(std::move(lit), scalar.safeGetByPosition(0).type->getName());
|
||||
}
|
||||
else
|
||||
{
|
||||
auto tuple = std::make_shared<ASTFunction>();
|
||||
tuple->alias = subquery.alias;
|
||||
ast = tuple;
|
||||
tuple->name = "tuple";
|
||||
auto exp_list = std::make_shared<ASTExpressionList>();
|
||||
tuple->arguments = exp_list;
|
||||
tuple->children.push_back(tuple->arguments);
|
||||
|
||||
exp_list->children.resize(columns);
|
||||
for (size_t i = 0; i < columns; ++i)
|
||||
{
|
||||
exp_list->children[i] = addTypeConversionToAST(
|
||||
std::make_unique<ASTLiteral>((*block.safeGetByPosition(i).column)[0]),
|
||||
block.safeGetByPosition(i).type->getName());
|
||||
}
|
||||
auto func = makeASTFunction("__getScalar", std::make_shared<ASTLiteral>(scalar_query_hash_str));
|
||||
func->alias = subquery.alias;
|
||||
func->prefer_alias_to_column_name = subquery.prefer_alias_to_column_name;
|
||||
ast = std::move(func);
|
||||
}
|
||||
|
||||
data.scalars[scalar_query_hash_str] = std::move(scalar);
|
||||
}
|
||||
|
||||
void ExecuteScalarSubqueriesMatcher::visit(const ASTFunction & func, ASTPtr & ast, Data & data)
|
||||
|
@ -1,6 +1,7 @@
|
||||
#pragma once
|
||||
|
||||
#include <Common/typeid_cast.h>
|
||||
#include <Core/Block.h>
|
||||
#include <Interpreters/InDepthNodeVisitor.h>
|
||||
|
||||
namespace DB
|
||||
@ -36,6 +37,7 @@ public:
|
||||
{
|
||||
const Context & context;
|
||||
size_t subquery_depth;
|
||||
Scalars & scalars;
|
||||
};
|
||||
|
||||
static bool needChildVisit(ASTPtr & node, const ASTPtr &);
|
||||
|
@ -305,6 +305,12 @@ InterpreterSelectQuery::InterpreterSelectQuery(
|
||||
|
||||
syntax_analyzer_result = SyntaxAnalyzer(context, options).analyze(
|
||||
query_ptr, source_header.getNamesAndTypesList(), required_result_column_names, storage, NamesAndTypesList());
|
||||
|
||||
/// Save scalar sub queries's results in the query context
|
||||
if (context.hasQueryContext())
|
||||
for (const auto & it : syntax_analyzer_result->getScalars())
|
||||
context.getQueryContext().addScalar(it.first, it.second);
|
||||
|
||||
query_analyzer = std::make_unique<SelectQueryExpressionAnalyzer>(
|
||||
query_ptr, syntax_analyzer_result, context,
|
||||
NameSet(required_result_column_names.begin(), required_result_column_names.end()),
|
||||
|
@ -220,10 +220,10 @@ void removeUnneededColumnsFromSelectClause(const ASTSelectQuery * select_query,
|
||||
}
|
||||
|
||||
/// Replacing scalar subqueries with constant values.
|
||||
void executeScalarSubqueries(ASTPtr & query, const Context & context, size_t subquery_depth)
|
||||
void executeScalarSubqueries(ASTPtr & query, const Context & context, size_t subquery_depth, Scalars & scalars)
|
||||
{
|
||||
LogAST log;
|
||||
ExecuteScalarSubqueriesVisitor::Data visitor_data{context, subquery_depth};
|
||||
ExecuteScalarSubqueriesVisitor::Data visitor_data{context, subquery_depth, scalars};
|
||||
ExecuteScalarSubqueriesVisitor(visitor_data, log.stream()).visit(query);
|
||||
}
|
||||
|
||||
@ -871,7 +871,7 @@ SyntaxAnalyzerResultPtr SyntaxAnalyzer::analyze(
|
||||
removeUnneededColumnsFromSelectClause(select_query, required_result_columns, remove_duplicates);
|
||||
|
||||
/// Executing scalar subqueries - replacing them with constant values.
|
||||
executeScalarSubqueries(query, context, subquery_depth);
|
||||
executeScalarSubqueries(query, context, subquery_depth, result.scalars);
|
||||
|
||||
/// Optimize if with constant condition after constants was substituted instead of scalar subqueries.
|
||||
OptimizeIfWithConstantConditionVisitor(result.aliases).visit(query);
|
||||
|
@ -1,5 +1,6 @@
|
||||
#pragma once
|
||||
|
||||
#include <Core/Block.h>
|
||||
#include <Core/NamesAndTypes.h>
|
||||
#include <Interpreters/Aliases.h>
|
||||
#include <Interpreters/SelectQueryOptions.h>
|
||||
@ -14,6 +15,7 @@ class ASTFunction;
|
||||
class AnalyzedJoin;
|
||||
class Context;
|
||||
struct SelectQueryOptions;
|
||||
using Scalars = std::map<String, Block>;
|
||||
|
||||
struct SyntaxAnalyzerResult
|
||||
{
|
||||
@ -43,8 +45,12 @@ struct SyntaxAnalyzerResult
|
||||
/// Predicate optimizer overrides the sub queries
|
||||
bool rewrite_subqueries = false;
|
||||
|
||||
/// Results of scalar sub queries
|
||||
Scalars scalars;
|
||||
|
||||
void collectUsedColumns(const ASTPtr & query, const NamesAndTypesList & additional_source_columns);
|
||||
Names requiredSourceColumns() const { return required_source_columns.getNames(); }
|
||||
const Scalars & getScalars() const { return scalars; }
|
||||
};
|
||||
|
||||
using SyntaxAnalyzerResultPtr = std::shared_ptr<const SyntaxAnalyzerResult>;
|
||||
|
@ -323,11 +323,13 @@ BlockInputStreams StorageDistributed::read(
|
||||
Block header =
|
||||
InterpreterSelectQuery(query_info.query, context, SelectQueryOptions(processed_stage)).getSampleBlock();
|
||||
|
||||
const Scalars & scalars = context.hasQueryContext() ? context.getQueryContext().getScalars() : Scalars{};
|
||||
|
||||
ClusterProxy::SelectStreamFactory select_stream_factory = remote_table_function_ptr
|
||||
? ClusterProxy::SelectStreamFactory(
|
||||
header, processed_stage, remote_table_function_ptr, context.getExternalTables())
|
||||
header, processed_stage, remote_table_function_ptr, scalars, context.getExternalTables())
|
||||
: ClusterProxy::SelectStreamFactory(
|
||||
header, processed_stage, QualifiedTableName{remote_database, remote_table}, context.getExternalTables());
|
||||
header, processed_stage, QualifiedTableName{remote_database, remote_table}, scalars, context.getExternalTables());
|
||||
|
||||
if (settings.optimize_skip_unused_shards)
|
||||
{
|
||||
|
Loading…
Reference in New Issue
Block a user