Merge branch 'master' into fix_deadlock_system_logs_startup

This commit is contained in:
alesapin 2020-05-21 22:57:52 +03:00 committed by GitHub
commit 59b3bc0c05
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
169 changed files with 1264 additions and 380 deletions

View File

@ -5,6 +5,7 @@ RUN apt-get --allow-unauthenticated update -y && apt-get install --yes wget gnup
RUN wget -O - https://apt.llvm.org/llvm-snapshot.gpg.key | apt-key add -
RUN echo "deb [trusted=yes] http://apt.llvm.org/eoan/ llvm-toolchain-eoan-10 main" >> /etc/apt/sources.list
RUN apt-get --allow-unauthenticated update -y \
&& env DEBIAN_FRONTEND=noninteractive \
apt-get --allow-unauthenticated install --yes --no-install-recommends \
@ -17,6 +18,14 @@ RUN apt-get --allow-unauthenticated update -y \
apt-transport-https \
ca-certificates
# Special dpkg-deb (https://github.com/ClickHouse-Extras/dpkg) version which is able
# to compress files using pigz (https://zlib.net/pigz/) instead of gzip.
# Significantly increase deb packaging speed and compatible with old systems
RUN curl -O https://clickhouse-builds.s3.yandex.net/utils/dpkg-deb
RUN chmod +x dpkg-deb
RUN cp dpkg-deb /usr/bin
# Libraries from OS are only needed to test the "unbundled" build (that is not used in production).
RUN apt-get --allow-unauthenticated update -y \
&& env DEBIAN_FRONTEND=noninteractive \
@ -74,12 +83,6 @@ RUN apt-get --allow-unauthenticated update -y \
libldap2-dev
# Special dpkg-deb (https://github.com/ClickHouse-Extras/dpkg) version which is able
# to compress files using pigz (https://zlib.net/pigz/) instead of gzip.
# Significantly increase deb packaging speed and compatible with old systems
RUN curl -O https://clickhouse-builds.s3.yandex.net/utils/dpkg-deb
RUN chmod +x dpkg-deb
RUN cp dpkg-deb /usr/bin
# This symlink required by gcc to find lld compiler
RUN ln -s /usr/bin/lld-10 /usr/bin/ld.lld

View File

@ -151,7 +151,7 @@ for query_index, q in enumerate(test_queries):
# use the test name + the test-wide query index.
query_display_name = q
if len(query_display_name) > 1000:
query_display_name = f'{query_display_name[:1000]}...({i})'
query_display_name = f'{query_display_name[:1000]}...({query_index})'
print(f'display-name\t{query_index}\t{tsv_escape(query_display_name)}')

View File

@ -72,7 +72,7 @@ Examples:
kafka_format = 'JSONEachRow',
kafka_num_consumers = 4;
CREATE TABLE queue2 (
CREATE TABLE queue3 (
timestamp UInt64,
level String,
message String

View File

@ -174,7 +174,7 @@ Upd. Всё ещё ждём удаление старого кода, котор
### 2.3. Перенос столбцового ser/de из DataType в Column {#perenos-stolbtsovogo-serde-iz-datatype-v-column}
В очереди.
В очереди. Антон Попов.
### 2.4. Перевод LowCardinality из DataType в Column. Добавление ColumnSparse {#perevod-lowcardinality-iz-datatype-v-column-dobavlenie-columnsparse}
@ -977,10 +977,10 @@ Q2.
[Виталий Баранов](https://github.com/vitlibar) и Денис Глазачев, Altinity. Требует 12.1.
### 12.6. Информация о пользователях и квотах в системной таблице {#informatsiia-o-polzovateliakh-i-kvotakh-v-sistemnoi-tablitse}
### 12.6. + Информация о пользователях и квотах в системной таблице {#informatsiia-o-polzovateliakh-i-kvotakh-v-sistemnoi-tablitse}
[Виталий Баранов](https://github.com/vitlibar). Требует 12.1.
Есть pull request. Q2.
Есть pull request. Q2. Готово.
## 13. Разделение ресурсов, multi-tenancy {#razdelenie-resursov-multi-tenancy}

View File

@ -58,17 +58,6 @@ def build_for_lang(lang, args):
'custom_dir': os.path.join(os.path.dirname(__file__), '..', args.theme_dir),
'language': lang,
'direction': 'rtl' if lang == 'fa' else 'ltr',
# TODO: cleanup
'feature': {
'tabs': False
},
'palette': {
'primary': 'white',
'accent': 'white'
},
'font': False,
'logo': 'images/logo.svg',
'favicon': 'assets/images/favicon.ico',
'static_templates': ['404.html'],
'extra': {
'now': int(time.mktime(datetime.datetime.now().timetuple())) # TODO better way to avoid caching

View File

@ -46,7 +46,7 @@ sudo yum-config-manager --add-repo https://repo.yandex.ru/clickhouse/rpm/stable/
sudo yum install clickhouse-server clickhouse-client
```
您也可以从此处手动下载和安装软件包https://repo.yandex.ru/clickhouse/rpm/stable/x86\_64。
您也可以从此处手动下载和安装软件包https://repo.yandex.ru/clickhouse/rpm/stable/x86_64。
### 来自Docker {#from-docker-image}

View File

@ -1,3 +1,7 @@
if (USE_CLANG_TIDY)
set (CMAKE_CXX_CLANG_TIDY "${CLANG_TIDY_PATH}")
endif ()
# 'clickhouse' binary is a multi purpose tool,
# that contain multiple execution modes (client, server, etc.)
# each of them is built and linked as a separate library, defined below.

View File

@ -289,7 +289,7 @@ private:
connection_entries.emplace_back(std::make_shared<Entry>(
connection->get(ConnectionTimeouts::getTCPTimeoutsWithoutFailover(settings))));
pool.scheduleOrThrowOnError(std::bind(&Benchmark::thread, this, connection_entries));
pool.scheduleOrThrowOnError([this, connection_entries]() mutable { thread(connection_entries); });
}
}
catch (...)

View File

@ -485,7 +485,7 @@ private:
history_file = config().getString("history_file");
else
{
auto history_file_from_env = getenv("CLICKHOUSE_HISTORY_FILE");
auto * history_file_from_env = getenv("CLICKHOUSE_HISTORY_FILE");
if (history_file_from_env)
history_file = history_file_from_env;
else if (!home_path.empty())
@ -1480,7 +1480,7 @@ private:
"\033[1m↗\033[0m",
};
auto indicator = indicators[increment % 8];
const char * indicator = indicators[increment % 8];
if (!send_logs && written_progress_chars)
message << '\r';

View File

@ -51,7 +51,7 @@ ConnectionParameters::ConnectionParameters(const Poco::Util::AbstractConfigurati
{
std::string prompt{"Password for user (" + user + "): "};
char buf[1000] = {};
if (auto result = readpassphrase(prompt.c_str(), buf, sizeof(buf), 0))
if (auto * result = readpassphrase(prompt.c_str(), buf, sizeof(buf), 0))
password = result;
}

View File

@ -5,6 +5,7 @@
#include <Client/Connection.h>
#include <IO/ConnectionTimeouts.h>
#include <common/LineReader.h>
#include <thread>
namespace DB

View File

@ -442,7 +442,7 @@ bool ClusterCopier::checkPartitionPieceIsDone(const TaskTable & task_table, cons
/// Collect all shards that contain partition piece number piece_number.
Strings piece_status_paths;
for (auto & shard : shards_with_partition)
for (const auto & shard : shards_with_partition)
{
ShardPartition & task_shard_partition = shard->partition_tasks.find(partition_name)->second;
ShardPartitionPiece & shard_partition_piece = task_shard_partition.pieces[piece_number];
@ -702,7 +702,7 @@ ASTPtr ClusterCopier::removeAliasColumnsFromCreateQuery(const ASTPtr & query_ast
auto new_columns_list = std::make_shared<ASTColumns>();
new_columns_list->set(new_columns_list->columns, new_columns);
if (auto indices = query_ast->as<ASTCreateQuery>()->columns_list->indices)
if (const auto * indices = query_ast->as<ASTCreateQuery>()->columns_list->indices)
new_columns_list->set(new_columns_list->indices, indices->clone());
new_query.replace(new_query.columns_list, new_columns_list);

View File

@ -94,7 +94,7 @@ void ClusterCopierApp::mainImpl()
StatusFile status_file(process_path + "/status");
ThreadStatus thread_status;
auto log = &logger();
auto * log = &logger();
LOG_INFO(log, "Starting clickhouse-copier ("
<< "id " << process_id << ", "
<< "host_id " << host_id << ", "

View File

@ -260,7 +260,7 @@ ShardPriority getReplicasPriority(const Cluster::Addresses & replicas, const std
return res;
res.is_remote = 1;
for (auto & replica : replicas)
for (const auto & replica : replicas)
{
if (isLocalAddress(DNSResolver::instance().resolveHost(replica.host_name)))
{
@ -270,7 +270,7 @@ ShardPriority getReplicasPriority(const Cluster::Addresses & replicas, const std
}
res.hostname_difference = std::numeric_limits<size_t>::max();
for (auto & replica : replicas)
for (const auto & replica : replicas)
{
size_t difference = getHostNameDifference(local_hostname, replica.host_name);
res.hostname_difference = std::min(difference, res.hostname_difference);

View File

@ -8,7 +8,6 @@
#include <Poco/NullChannel.h>
#include <Databases/DatabaseMemory.h>
#include <Storages/System/attachSystemTables.h>
#include <Interpreters/Context.h>
#include <Interpreters/ProcessList.h>
#include <Interpreters/executeQuery.h>
#include <Interpreters/loadMetadata.h>

View File

@ -4,13 +4,12 @@
#include <Poco/Util/Application.h>
#include <memory>
#include <loggers/Loggers.h>
#include <Interpreters/Context.h>
namespace DB
{
class Context;
/// Lightweight Application for clickhouse-local
/// No networking, no extra configs and working directories, no pid and status files, no dictionaries, no logging.
/// Quiet mode by default

View File

@ -937,10 +937,10 @@ public:
if (typeid_cast<const DataTypeFixedString *>(&data_type))
return std::make_unique<FixedStringModel>(seed);
if (auto type = typeid_cast<const DataTypeArray *>(&data_type))
if (const auto * type = typeid_cast<const DataTypeArray *>(&data_type))
return std::make_unique<ArrayModel>(get(*type->getNestedType(), seed, markov_model_params));
if (auto type = typeid_cast<const DataTypeNullable *>(&data_type))
if (const auto * type = typeid_cast<const DataTypeNullable *>(&data_type))
return std::make_unique<NullableModel>(get(*type->getNestedType(), seed, markov_model_params));
throw Exception("Unsupported data type", ErrorCodes::NOT_IMPLEMENTED);

View File

@ -24,8 +24,8 @@ namespace
query.table_id.table_name = table_name;
query.columns = std::make_shared<ASTExpressionList>(',');
query.children.push_back(query.columns);
for (size_t i = 0; i < columns.size(); ++i)
query.columns->children.emplace_back(std::make_shared<ASTIdentifier>(columns[i].name));
for (const auto & column : columns)
query.columns->children.emplace_back(std::make_shared<ASTIdentifier>(column.name));
std::stringstream ss;
IAST::FormatSettings settings(ss, true);

View File

@ -195,7 +195,7 @@ void HTTPHandler::pushDelayedResults(Output & used_output)
std::vector<ReadBufferPtr> read_buffers;
std::vector<ReadBuffer *> read_buffers_raw_ptr;
auto cascade_buffer = typeid_cast<CascadeWriteBuffer *>(used_output.out_maybe_delayed_and_compressed.get());
auto * cascade_buffer = typeid_cast<CascadeWriteBuffer *>(used_output.out_maybe_delayed_and_compressed.get());
if (!cascade_buffer)
throw Exception("Expected CascadeWriteBuffer", ErrorCodes::LOGICAL_ERROR);
@ -383,7 +383,7 @@ void HTTPHandler::processQuery(
{
auto push_memory_buffer_and_continue = [next_buffer = used_output.out_maybe_compressed] (const WriteBufferPtr & prev_buf)
{
auto prev_memory_buffer = typeid_cast<MemoryWriteBuffer *>(prev_buf.get());
auto * prev_memory_buffer = typeid_cast<MemoryWriteBuffer *>(prev_buf.get());
if (!prev_memory_buffer)
throw Exception("Expected MemoryWriteBuffer", ErrorCodes::LOGICAL_ERROR);

View File

@ -28,7 +28,7 @@ HTTPRequestHandlerFactoryMain::HTTPRequestHandlerFactoryMain(const std::string &
{
}
Poco::Net::HTTPRequestHandler * HTTPRequestHandlerFactoryMain::createRequestHandler(const Poco::Net::HTTPServerRequest & request) // override
Poco::Net::HTTPRequestHandler * HTTPRequestHandlerFactoryMain::createRequestHandler(const Poco::Net::HTTPServerRequest & request)
{
LOG_TRACE(log, "HTTP Request for " << name << ". "
<< "Method: " << request.getMethod()
@ -40,7 +40,7 @@ Poco::Net::HTTPRequestHandler * HTTPRequestHandlerFactoryMain::createRequestHand
for (auto & handler_factory : child_factories)
{
auto handler = handler_factory->createRequestHandler(request);
auto * handler = handler_factory->createRequestHandler(request);
if (handler != nullptr)
return handler;
}
@ -72,80 +72,98 @@ HTTPRequestHandlerFactoryMain::TThis * HTTPRequestHandlerFactoryMain::addHandler
static inline auto createHandlersFactoryFromConfig(IServer & server, const std::string & name, const String & prefix)
{
auto main_handler_factory = new HTTPRequestHandlerFactoryMain(name);
auto main_handler_factory = std::make_unique<HTTPRequestHandlerFactoryMain>(name);
try
Poco::Util::AbstractConfiguration::Keys keys;
server.config().keys(prefix, keys);
for (const auto & key : keys)
{
Poco::Util::AbstractConfiguration::Keys keys;
server.config().keys(prefix, keys);
if (!startsWith(key, "rule"))
throw Exception("Unknown element in config: " + prefix + "." + key + ", must be 'rule'", ErrorCodes::UNKNOWN_ELEMENT_IN_CONFIG);
for (const auto & key : keys)
{
if (!startsWith(key, "rule"))
throw Exception("Unknown element in config: " + prefix + "." + key + ", must be 'rule'", ErrorCodes::UNKNOWN_ELEMENT_IN_CONFIG);
const auto & handler_type = server.config().getString(prefix + "." + key + ".handler.type", "");
const auto & handler_type = server.config().getString(prefix + "." + key + ".handler.type", "");
if (handler_type == "static")
main_handler_factory->addHandler(createStaticHandlerFactory(server, prefix + "." + key));
else if (handler_type == "dynamic_query_handler")
main_handler_factory->addHandler(createDynamicHandlerFactory(server, prefix + "." + key));
else if (handler_type == "predefined_query_handler")
main_handler_factory->addHandler(createPredefinedHandlerFactory(server, prefix + "." + key));
else if (handler_type.empty())
throw Exception("Handler type in config is not specified here: " +
prefix + "." + key + ".handler.type", ErrorCodes::INVALID_CONFIG_PARAMETER);
else
throw Exception("Unknown handler type '" + handler_type +"' in config here: " +
prefix + "." + key + ".handler.type",ErrorCodes::INVALID_CONFIG_PARAMETER);
}
return main_handler_factory;
}
catch (...)
{
delete main_handler_factory;
throw;
if (handler_type == "static")
main_handler_factory->addHandler(createStaticHandlerFactory(server, prefix + "." + key));
else if (handler_type == "dynamic_query_handler")
main_handler_factory->addHandler(createDynamicHandlerFactory(server, prefix + "." + key));
else if (handler_type == "predefined_query_handler")
main_handler_factory->addHandler(createPredefinedHandlerFactory(server, prefix + "." + key));
else if (handler_type.empty())
throw Exception("Handler type in config is not specified here: " +
prefix + "." + key + ".handler.type", ErrorCodes::INVALID_CONFIG_PARAMETER);
else
throw Exception("Unknown handler type '" + handler_type +"' in config here: " +
prefix + "." + key + ".handler.type",ErrorCodes::INVALID_CONFIG_PARAMETER);
}
return main_handler_factory.release();
}
static const auto ping_response_expression = "Ok.\n";
static const auto root_response_expression = "config://http_server_default_response";
static inline Poco::Net::HTTPRequestHandlerFactory * createHTTPHandlerFactory(IServer & server, const std::string & name, AsynchronousMetrics & async_metrics)
static inline Poco::Net::HTTPRequestHandlerFactory * createHTTPHandlerFactory(
IServer & server, const std::string & name, AsynchronousMetrics & async_metrics)
{
if (server.config().has("http_handlers"))
return createHandlersFactoryFromConfig(server, name, "http_handlers");
else
{
auto factory = (new HTTPRequestHandlerFactoryMain(name))
->addHandler((new HandlingRuleHTTPHandlerFactory<StaticRequestHandler>(server, root_response_expression))
->attachStrictPath("/")->allowGetAndHeadRequest())
->addHandler((new HandlingRuleHTTPHandlerFactory<StaticRequestHandler>(server, ping_response_expression))
->attachStrictPath("/ping")->allowGetAndHeadRequest())
->addHandler((new HandlingRuleHTTPHandlerFactory<ReplicasStatusHandler>(server))
->attachNonStrictPath("/replicas_status")->allowGetAndHeadRequest())
->addHandler((new HandlingRuleHTTPHandlerFactory<DynamicQueryHandler>(server, "query"))->allowPostAndGetParamsRequest());
auto factory = std::make_unique<HTTPRequestHandlerFactoryMain>(name);
auto root_handler = std::make_unique<HandlingRuleHTTPHandlerFactory<StaticRequestHandler>>(server, root_response_expression);
root_handler->attachStrictPath("/")->allowGetAndHeadRequest();
factory->addHandler(root_handler.release());
auto ping_handler = std::make_unique<HandlingRuleHTTPHandlerFactory<StaticRequestHandler>>(server, ping_response_expression);
ping_handler->attachStrictPath("/ping")->allowGetAndHeadRequest();
factory->addHandler(ping_handler.release());
auto replicas_status_handler = std::make_unique<HandlingRuleHTTPHandlerFactory<ReplicasStatusHandler>>(server);
replicas_status_handler->attachNonStrictPath("/replicas_status")->allowGetAndHeadRequest();
factory->addHandler(replicas_status_handler.release());
auto query_handler = std::make_unique<HandlingRuleHTTPHandlerFactory<DynamicQueryHandler>>(server, "query");
query_handler->allowPostAndGetParamsRequest();
factory->addHandler(query_handler.release());
/// We check that prometheus handler will be served on current (default) port.
/// Otherwise it will be created separately, see below.
if (server.config().has("prometheus") && server.config().getInt("prometheus.port", 0) == 0)
factory->addHandler((new HandlingRuleHTTPHandlerFactory<PrometheusRequestHandler>(
server, PrometheusMetricsWriter(server.config(), "prometheus", async_metrics)))
->attachStrictPath(server.config().getString("prometheus.endpoint", "/metrics"))->allowGetAndHeadRequest());
{
auto prometheus_handler = std::make_unique<HandlingRuleHTTPHandlerFactory<PrometheusRequestHandler>>(
server, PrometheusMetricsWriter(server.config(), "prometheus", async_metrics));
prometheus_handler->attachStrictPath(server.config().getString("prometheus.endpoint", "/metrics"))->allowGetAndHeadRequest();
factory->addHandler(prometheus_handler.release());
}
return factory;
return factory.release();
}
}
static inline Poco::Net::HTTPRequestHandlerFactory * createInterserverHTTPHandlerFactory(IServer & server, const std::string & name)
{
return (new HTTPRequestHandlerFactoryMain(name))
->addHandler((new HandlingRuleHTTPHandlerFactory<StaticRequestHandler>(server, root_response_expression))
->attachStrictPath("/")->allowGetAndHeadRequest())
->addHandler((new HandlingRuleHTTPHandlerFactory<StaticRequestHandler>(server, ping_response_expression))
->attachStrictPath("/ping")->allowGetAndHeadRequest())
->addHandler((new HandlingRuleHTTPHandlerFactory<ReplicasStatusHandler>(server))
->attachNonStrictPath("/replicas_status")->allowGetAndHeadRequest())
->addHandler((new HandlingRuleHTTPHandlerFactory<InterserverIOHTTPHandler>(server))->allowPostAndGetParamsRequest());
auto factory = std::make_unique<HTTPRequestHandlerFactoryMain>(name);
auto root_handler = std::make_unique<HandlingRuleHTTPHandlerFactory<StaticRequestHandler>>(server, root_response_expression);
root_handler->attachStrictPath("/")->allowGetAndHeadRequest();
factory->addHandler(root_handler.release());
auto ping_handler = std::make_unique<HandlingRuleHTTPHandlerFactory<StaticRequestHandler>>(server, ping_response_expression);
ping_handler->attachStrictPath("/ping")->allowGetAndHeadRequest();
factory->addHandler(ping_handler.release());
auto replicas_status_handler = std::make_unique<HandlingRuleHTTPHandlerFactory<ReplicasStatusHandler>>(server);
replicas_status_handler->attachNonStrictPath("/replicas_status")->allowGetAndHeadRequest();
factory->addHandler(replicas_status_handler.release());
auto main_handler = std::make_unique<HandlingRuleHTTPHandlerFactory<InterserverIOHTTPHandler>>(server);
main_handler->allowPostAndGetParamsRequest();
factory->addHandler(main_handler.release());
return factory.release();
}
Poco::Net::HTTPRequestHandlerFactory * createHandlerFactory(IServer & server, AsynchronousMetrics & async_metrics, const std::string & name)
@ -155,9 +173,14 @@ Poco::Net::HTTPRequestHandlerFactory * createHandlerFactory(IServer & server, As
else if (name == "InterserverIOHTTPHandler-factory" || name == "InterserverIOHTTPSHandler-factory")
return createInterserverHTTPHandlerFactory(server, name);
else if (name == "PrometheusHandler-factory")
return (new HTTPRequestHandlerFactoryMain(name))->addHandler((new HandlingRuleHTTPHandlerFactory<PrometheusRequestHandler>(
server, PrometheusMetricsWriter(server.config(), "prometheus", async_metrics)))
->attachStrictPath(server.config().getString("prometheus.endpoint", "/metrics"))->allowGetAndHeadRequest());
{
auto factory = std::make_unique<HTTPRequestHandlerFactoryMain>(name);
auto handler = std::make_unique<HandlingRuleHTTPHandlerFactory<PrometheusRequestHandler>>(
server, PrometheusMetricsWriter(server.config(), "prometheus", async_metrics));
handler->attachStrictPath(server.config().getString("prometheus.endpoint", "/metrics"))->allowGetAndHeadRequest();
factory->addHandler(handler.release());
return factory.release();
}
throw Exception("LOGICAL ERROR: Unknown HTTP handler factory name.", ErrorCodes::LOGICAL_ERROR);
}

View File

@ -46,7 +46,7 @@ void ReplicasStatusHandler::handleRequest(Poco::Net::HTTPServerRequest & request
for (auto iterator = db.second->getTablesIterator(); iterator->isValid(); iterator->next())
{
auto & table = iterator->table();
const auto & table = iterator->table();
StorageReplicatedMergeTree * table_replicated = dynamic_cast<StorageReplicatedMergeTree *>(table.get());
if (!table_replicated)

View File

@ -278,8 +278,11 @@ void TCPHandler::runImpl()
sendLogs();
sendEndOfStream();
query_scope.reset();
/// QueryState should be cleared before QueryScope, since otherwise
/// the MemoryTracker will be wrong for possible deallocations.
/// (i.e. deallocations from the Aggregator with two-level aggregation)
state.reset();
query_scope.reset();
}
catch (const Exception & e)
{
@ -359,8 +362,11 @@ void TCPHandler::runImpl()
try
{
query_scope.reset();
/// QueryState should be cleared before QueryScope, since otherwise
/// the MemoryTracker will be wrong for possible deallocations.
/// (i.e. deallocations from the Aggregator with two-level aggregation)
state.reset();
query_scope.reset();
}
catch (...)
{

View File

@ -4,6 +4,7 @@
#include <AggregateFunctions/IAggregateFunction.h>
#include <Columns/ColumnNullable.h>
#include <Common/assert_cast.h>
#include <Columns/ColumnsCommon.h>
#include <DataTypes/DataTypeNullable.h>
#include <IO/ReadHelpers.h>
#include <IO/WriteHelpers.h>
@ -53,13 +54,13 @@ protected:
static void initFlag(AggregateDataPtr place) noexcept
{
if (result_is_nullable)
if constexpr (result_is_nullable)
place[0] = 0;
}
static void setFlag(AggregateDataPtr place) noexcept
{
if (result_is_nullable)
if constexpr (result_is_nullable)
place[0] = 1;
}
@ -72,7 +73,7 @@ public:
AggregateFunctionNullBase(AggregateFunctionPtr nested_function_, const DataTypes & arguments, const Array & params)
: IAggregateFunctionHelper<Derived>(arguments, params), nested_function{nested_function_}
{
if (result_is_nullable)
if constexpr (result_is_nullable)
prefix_size = nested_function->alignOfData();
else
prefix_size = 0;
@ -128,7 +129,7 @@ public:
void serialize(ConstAggregateDataPtr place, WriteBuffer & buf) const override
{
bool flag = getFlag(place);
if (result_is_nullable)
if constexpr (result_is_nullable)
writeBinary(flag, buf);
if (flag)
nested_function->serialize(nestedPlace(place), buf);
@ -137,7 +138,7 @@ public:
void deserialize(AggregateDataPtr place, ReadBuffer & buf, Arena * arena) const override
{
bool flag = 1;
if (result_is_nullable)
if constexpr (result_is_nullable)
readBinary(flag, buf);
if (flag)
{
@ -148,7 +149,7 @@ public:
void insertResultInto(AggregateDataPtr place, IColumn & to) const override
{
if (result_is_nullable)
if constexpr (result_is_nullable)
{
ColumnNullable & to_concrete = assert_cast<ColumnNullable &>(to);
if (getFlag(place))
@ -194,13 +195,26 @@ public:
void add(AggregateDataPtr place, const IColumn ** columns, size_t row_num, Arena * arena) const override
{
const ColumnNullable * column = assert_cast<const ColumnNullable *>(columns[0]);
const IColumn * nested_column = &column->getNestedColumn();
if (!column->isNullAt(row_num))
{
this->setFlag(place);
const IColumn * nested_column = &column->getNestedColumn();
this->nested_function->add(this->nestedPlace(place), &nested_column, row_num, arena);
}
}
void addBatchSinglePlace(size_t batch_size, AggregateDataPtr place, const IColumn ** columns, Arena * arena) const override
{
const ColumnNullable * column = assert_cast<const ColumnNullable *>(columns[0]);
const IColumn * nested_column = &column->getNestedColumn();
const UInt8 * null_map = column->getNullMapData().data();
this->nested_function->addBatchSinglePlaceNotNull(batch_size, this->nestedPlace(place), &nested_column, null_map, arena);
if constexpr (result_is_nullable)
if (!memoryIsByte(null_map, batch_size, 1))
this->setFlag(place);
}
};

View File

@ -20,11 +20,72 @@ struct AggregateFunctionSumData
{
T sum{};
void add(T value)
void ALWAYS_INLINE add(T value)
{
sum += value;
}
/// Vectorized version
template <typename Value>
void NO_INLINE addMany(const Value * __restrict ptr, size_t count)
{
/// Compiler cannot unroll this loop, do it manually.
/// (at least for floats, most likely due to the lack of -fassociative-math)
/// Something around the number of SSE registers * the number of elements fit in register.
constexpr size_t unroll_count = 128 / sizeof(T);
T partial_sums[unroll_count]{};
const auto * end = ptr + count;
const auto * unrolled_end = ptr + (count / unroll_count * unroll_count);
while (ptr < unrolled_end)
{
for (size_t i = 0; i < unroll_count; ++i)
partial_sums[i] += ptr[i];
ptr += unroll_count;
}
for (size_t i = 0; i < unroll_count; ++i)
sum += partial_sums[i];
while (ptr < end)
{
sum += *ptr;
++ptr;
}
}
template <typename Value>
void NO_INLINE addManyNotNull(const Value * __restrict ptr, const UInt8 * __restrict null_map, size_t count)
{
constexpr size_t unroll_count = 128 / sizeof(T);
T partial_sums[unroll_count]{};
const auto * end = ptr + count;
const auto * unrolled_end = ptr + (count / unroll_count * unroll_count);
while (ptr < unrolled_end)
{
for (size_t i = 0; i < unroll_count; ++i)
if (!null_map[i])
partial_sums[i] += ptr[i];
ptr += unroll_count;
null_map += unroll_count;
}
for (size_t i = 0; i < unroll_count; ++i)
sum += partial_sums[i];
while (ptr < end)
{
if (!*null_map)
sum += *ptr;
++ptr;
++null_map;
}
}
void merge(const AggregateFunctionSumData & rhs)
{
sum += rhs.sum;
@ -55,21 +116,95 @@ struct AggregateFunctionSumKahanData
T sum{};
T compensation{};
void add(T value)
template <typename Value>
void ALWAYS_INLINE addImpl(Value value, T & out_sum, T & out_compensation)
{
auto compensated_value = value - compensation;
auto new_sum = sum + compensated_value;
compensation = (new_sum - sum) - compensated_value;
sum = new_sum;
auto compensated_value = value - out_compensation;
auto new_sum = out_sum + compensated_value;
out_compensation = (new_sum - out_sum) - compensated_value;
out_sum = new_sum;
}
void ALWAYS_INLINE add(T value)
{
addImpl(value, sum, compensation);
}
/// Vectorized version
template <typename Value>
void NO_INLINE addMany(const Value * __restrict ptr, size_t count)
{
/// Less than in ordinary sum, because the algorithm is more complicated and too large loop unrolling is questionable.
/// But this is just a guess.
constexpr size_t unroll_count = 4;
T partial_sums[unroll_count]{};
T partial_compensations[unroll_count]{};
const auto * end = ptr + count;
const auto * unrolled_end = ptr + (count / unroll_count * unroll_count);
while (ptr < unrolled_end)
{
for (size_t i = 0; i < unroll_count; ++i)
addImpl(ptr[i], partial_sums[i], partial_compensations[i]);
ptr += unroll_count;
}
for (size_t i = 0; i < unroll_count; ++i)
mergeImpl(sum, compensation, partial_sums[i], partial_compensations[i]);
while (ptr < end)
{
addImpl(*ptr, sum, compensation);
++ptr;
}
}
template <typename Value>
void NO_INLINE addManyNotNull(const Value * __restrict ptr, const UInt8 * __restrict null_map, size_t count)
{
constexpr size_t unroll_count = 4;
T partial_sums[unroll_count]{};
T partial_compensations[unroll_count]{};
const auto * end = ptr + count;
const auto * unrolled_end = ptr + (count / unroll_count * unroll_count);
while (ptr < unrolled_end)
{
for (size_t i = 0; i < unroll_count; ++i)
if (!null_map[i])
addImpl(ptr[i], partial_sums[i], partial_compensations[i]);
ptr += unroll_count;
null_map += unroll_count;
}
for (size_t i = 0; i < unroll_count; ++i)
mergeImpl(sum, compensation, partial_sums[i], partial_compensations[i]);
while (ptr < end)
{
if (!*null_map)
addImpl(*ptr, sum, compensation);
++ptr;
++null_map;
}
}
void ALWAYS_INLINE mergeImpl(T & to_sum, T & to_compensation, T from_sum, T from_compensation)
{
auto raw_sum = to_sum + from_sum;
auto rhs_compensated = raw_sum - to_sum;
/// Kahan summation is tricky because it depends on non-associativity of float arithmetic.
/// Do not simplify this expression if you are not sure.
auto compensations = ((from_sum - rhs_compensated) + (to_sum - (raw_sum - rhs_compensated))) + compensation + from_compensation;
to_sum = raw_sum + compensations;
to_compensation = compensations - (to_sum - raw_sum);
}
void merge(const AggregateFunctionSumKahanData & rhs)
{
auto raw_sum = sum + rhs.sum;
auto rhs_compensated = raw_sum - sum;
auto compensations = ((rhs.sum - rhs_compensated) + (sum - (raw_sum - rhs_compensated))) + compensation + rhs.compensation;
sum = raw_sum + compensations;
compensation = compensations - (sum - raw_sum);
mergeImpl(sum, compensation, rhs.sum, rhs.compensation);
}
void write(WriteBuffer & buf) const
@ -141,6 +276,20 @@ public:
this->data(place).add(column.getData()[row_num]);
}
/// Vectorized version when there is no GROUP BY keys.
void addBatchSinglePlace(size_t batch_size, AggregateDataPtr place, const IColumn ** columns, Arena *) const override
{
const auto & column = static_cast<const ColVecType &>(*columns[0]);
this->data(place).addMany(column.getData().data(), batch_size);
}
void addBatchSinglePlaceNotNull(
size_t batch_size, AggregateDataPtr place, const IColumn ** columns, const UInt8 * null_map, Arena *) const override
{
const auto & column = static_cast<const ColVecType &>(*columns[0]);
this->data(place).addManyNotNull(column.getData().data(), null_map, batch_size);
}
void merge(AggregateDataPtr place, ConstAggregateDataPtr rhs, Arena *) const override
{
this->data(place).merge(this->data(rhs));

View File

@ -145,6 +145,11 @@ public:
*/
virtual void addBatchSinglePlace(size_t batch_size, AggregateDataPtr place, const IColumn ** columns, Arena * arena) const = 0;
/** The same for single place when need to aggregate only filtered data.
*/
virtual void addBatchSinglePlaceNotNull(
size_t batch_size, AggregateDataPtr place, const IColumn ** columns, const UInt8 * null_map, Arena * arena) const = 0;
/** In addition to addBatch, this method collects multiple rows of arguments into array "places"
* as long as they are between offsets[i-1] and offsets[i]. This is used for arrayReduce and
* -Array combinator. It might also be used generally to break data dependency when array
@ -201,6 +206,14 @@ public:
static_cast<const Derived *>(this)->add(place, columns, i, arena);
}
void addBatchSinglePlaceNotNull(
size_t batch_size, AggregateDataPtr place, const IColumn ** columns, const UInt8 * null_map, Arena * arena) const override
{
for (size_t i = 0; i < batch_size; ++i)
if (!null_map[i])
static_cast<const Derived *>(this)->add(place, columns, i, arena);
}
void addBatchArray(
size_t batch_size, AggregateDataPtr * places, size_t place_offset, const IColumn ** columns, const UInt64 * offsets, Arena * arena)
const override

View File

@ -50,6 +50,8 @@ class Connection;
using ConnectionPtr = std::shared_ptr<Connection>;
using Connections = std::vector<ConnectionPtr>;
using Scalars = std::map<String, Block>;
/// Packet that could be received from server.
struct Packet

View File

@ -56,8 +56,8 @@ int main(int, char **)
MutableColumnPtr mut = IColumn::mutate(std::move(y));
mut->set(2);
std::cerr << "refcounts: " << x->use_count() << ", " << y->use_count() << ", " << mut->use_count() << "\n";
std::cerr << "addresses: " << x.get() << ", " << y.get() << ", " << mut.get() << "\n";
std::cerr << "refcounts: " << x->use_count() << ", " << mut->use_count() << "\n";
std::cerr << "addresses: " << x.get() << ", " << mut.get() << "\n";
y = std::move(mut);
}
@ -75,8 +75,8 @@ int main(int, char **)
MutableColumnPtr mut = IColumn::mutate(std::move(y));
mut->set(3);
std::cerr << "refcounts: " << x->use_count() << ", " << y->use_count() << ", " << mut->use_count() << "\n";
std::cerr << "addresses: " << x.get() << ", " << y.get() << ", " << mut.get() << "\n";
std::cerr << "refcounts: " << x->use_count() << ", " << mut->use_count() << "\n";
std::cerr << "addresses: " << x.get() << ", " << mut.get() << "\n";
y = std::move(mut);
}

View File

@ -75,8 +75,8 @@ int main(int, char **)
MutableColumnPtr mut = IColumn::mutate(std::move(y));
mut->set(2);
std::cerr << "refcounts: " << x->use_count() << ", " << y->use_count() << ", " << mut->use_count() << "\n";
std::cerr << "addresses: " << x.get() << ", " << y.get() << ", " << mut.get() << "\n";
std::cerr << "refcounts: " << x->use_count() << ", " << mut->use_count() << "\n";
std::cerr << "addresses: " << x.get() << ", " << mut.get() << "\n";
y = std::move(mut);
}
@ -94,8 +94,8 @@ int main(int, char **)
MutableColumnPtr mut = IColumn::mutate(std::move(y));
mut->set(3);
std::cerr << "refcounts: " << x->use_count() << ", " << y->use_count() << ", " << mut->use_count() << "\n";
std::cerr << "addresses: " << x.get() << ", " << y.get() << ", " << mut.get() << "\n";
std::cerr << "refcounts: " << x->use_count() << ", " << mut->use_count() << "\n";
std::cerr << "addresses: " << x.get() << ", " << ", " << mut.get() << "\n";
y = std::move(mut);
}

View File

@ -52,6 +52,8 @@ struct Settings : public SettingsCollection<Settings>
M(SettingUInt64, max_insert_block_size, DEFAULT_INSERT_BLOCK_SIZE, "The maximum block size for insertion, if we control the creation of blocks for insertion.", 0) \
M(SettingUInt64, min_insert_block_size_rows, DEFAULT_INSERT_BLOCK_SIZE, "Squash blocks passed to INSERT query to specified size in rows, if blocks are not big enough.", 0) \
M(SettingUInt64, min_insert_block_size_bytes, (DEFAULT_INSERT_BLOCK_SIZE * 256), "Squash blocks passed to INSERT query to specified size in bytes, if blocks are not big enough.", 0) \
M(SettingUInt64, min_insert_block_size_rows_for_materialized_views, 0, "Like min_insert_block_size_rows, but applied only during pushing to MATERIALIZED VIEW (default: min_insert_block_size_rows)", 0) \
M(SettingUInt64, min_insert_block_size_bytes_for_materialized_views, 0, "Like min_insert_block_size_bytes, but applied only during pushing to MATERIALIZED VIEW (default: min_insert_block_size_bytes)", 0) \
M(SettingUInt64, max_joined_block_size_rows, DEFAULT_BLOCK_SIZE, "Maximum block size for JOIN result (if join algorithm supports it). 0 means unlimited.", 0) \
M(SettingUInt64, max_insert_threads, 0, "The maximum number of threads to execute the INSERT SELECT query. Values 0 or 1 means that INSERT SELECT is not run in parallel. Higher values will lead to higher memory usage. Parallel INSERT SELECT has effect only if the SELECT part is run on parallel, see 'max_threads' setting.", 0) \
M(SettingUInt64, max_final_threads, 16, "The maximum number of threads to read from table with FINAL.", 0) \

View File

@ -3,12 +3,12 @@
#include <DataStreams/IBlockOutputStream.h>
#include <Columns/ColumnConst.h>
#include <Storages/ColumnDefault.h>
#include <Interpreters/Context.h>
namespace DB
{
class Context;
/** This stream adds three types of columns into block
* 1. Columns, that are missed inside request, but present in table without defaults (missed columns)

View File

@ -2,12 +2,13 @@
#include <DataStreams/IBlockInputStream.h>
#include <Storages/ColumnDefault.h>
#include <Interpreters/Context.h>
namespace DB
{
class Context;
/// Adds defaults to columns using BlockDelayedDefaults bitmask attached to Block by child InputStream.
class AddingDefaultsBlockInputStream : public IBlockInputStream
{

View File

@ -1,4 +1,5 @@
#include <Interpreters/Set.h>
#include <Interpreters/Context.h>
#include <DataStreams/materializeBlock.h>
#include <DataStreams/IBlockOutputStream.h>
#include <DataStreams/CreatingSetsBlockInputStream.h>

View File

@ -8,7 +8,6 @@
#include <IO/ReadBuffer.h>
#include <Processors/Formats/IRowInputFormat.h>
#include <Processors/Formats/InputStreamFromInputFormat.h>
#include <Interpreters/Context.h>
namespace DB
{

View File

@ -5,6 +5,7 @@
#include <DataTypes/NestedUtils.h>
#include <Interpreters/InterpreterSelectQuery.h>
#include <Interpreters/InterpreterInsertQuery.h>
#include <Interpreters/Context.h>
#include <Parsers/ASTInsertQuery.h>
#include <Common/CurrentThread.h>
#include <Common/setThreadName.h>
@ -40,10 +41,20 @@ PushingToViewsBlockOutputStream::PushingToViewsBlockOutputStream(
/// We need special context for materialized views insertions
if (!dependencies.empty())
{
views_context = std::make_unique<Context>(context);
select_context = std::make_unique<Context>(context);
insert_context = std::make_unique<Context>(context);
const auto & insert_settings = insert_context->getSettingsRef();
// Do not deduplicate insertions into MV if the main insertion is Ok
if (disable_deduplication_for_children)
views_context->setSetting("insert_deduplicate", false);
insert_context->setSetting("insert_deduplicate", false);
// Separate min_insert_block_size_rows/min_insert_block_size_bytes for children
if (insert_settings.min_insert_block_size_rows_for_materialized_views.changed)
insert_context->setSetting("min_insert_block_size_rows", insert_settings.min_insert_block_size_rows_for_materialized_views.value);
if (insert_settings.min_insert_block_size_bytes_for_materialized_views.changed)
insert_context->setSetting("min_insert_block_size_bytes", insert_settings.min_insert_block_size_bytes_for_materialized_views.value);
}
for (const auto & database_table : dependencies)
@ -67,7 +78,7 @@ PushingToViewsBlockOutputStream::PushingToViewsBlockOutputStream(
insert->table_id = inner_table_id;
/// Get list of columns we get from select query.
auto header = InterpreterSelectQuery(query, *views_context, SelectQueryOptions().analyze())
auto header = InterpreterSelectQuery(query, *select_context, SelectQueryOptions().analyze())
.getSampleBlock();
/// Insert only columns returned by select.
@ -81,14 +92,14 @@ PushingToViewsBlockOutputStream::PushingToViewsBlockOutputStream(
insert->columns = std::move(list);
ASTPtr insert_query_ptr(insert.release());
InterpreterInsertQuery interpreter(insert_query_ptr, *views_context);
InterpreterInsertQuery interpreter(insert_query_ptr, *insert_context);
BlockIO io = interpreter.execute();
out = io.out;
}
else if (dynamic_cast<const StorageLiveView *>(dependent_table.get()))
out = std::make_shared<PushingToViewsBlockOutputStream>(dependent_table, *views_context, ASTPtr(), true);
out = std::make_shared<PushingToViewsBlockOutputStream>(dependent_table, *insert_context, ASTPtr(), true);
else
out = std::make_shared<PushingToViewsBlockOutputStream>(dependent_table, *views_context, ASTPtr());
out = std::make_shared<PushingToViewsBlockOutputStream>(dependent_table, *insert_context, ASTPtr());
views.emplace_back(ViewInfo{std::move(query), database_table, std::move(out), nullptr});
}
@ -258,7 +269,7 @@ void PushingToViewsBlockOutputStream::process(const Block & block, size_t view_n
/// but it will contain single block (that is INSERT-ed into main table).
/// InterpreterSelectQuery will do processing of alias columns.
Context local_context = *views_context;
Context local_context = *select_context;
local_context.addViewSource(
StorageValues::create(
storage->getStorageID(), storage->getColumns(), block, storage->getVirtuals()));

View File

@ -44,7 +44,8 @@ private:
};
std::vector<ViewInfo> views;
std::unique_ptr<Context> views_context;
std::unique_ptr<Context> select_context;
std::unique_ptr<Context> insert_context;
void process(const Block & block, size_t view_num);
};

View File

@ -4,6 +4,7 @@
#include <DataStreams/IBlockOutputStream.h>
#include <Common/Throttler.h>
#include <IO/ConnectionTimeouts.h>
#include <Interpreters/ClientInfo.h>
namespace DB

View File

@ -376,8 +376,10 @@ void registerDataTypeString(DataTypeFactory & factory)
/// These synonyms are added for compatibility.
factory.registerAlias("CHAR", "String", DataTypeFactory::CaseInsensitive);
factory.registerAlias("NCHAR", "String", DataTypeFactory::CaseInsensitive);
factory.registerAlias("CHARACTER", "String", DataTypeFactory::CaseInsensitive);
factory.registerAlias("VARCHAR", "String", DataTypeFactory::CaseInsensitive);
factory.registerAlias("NVARCHAR", "String", DataTypeFactory::CaseInsensitive);
factory.registerAlias("VARCHAR2", "String", DataTypeFactory::CaseInsensitive); /// Oracle
factory.registerAlias("TEXT", "String", DataTypeFactory::CaseInsensitive);
factory.registerAlias("TINYTEXT", "String", DataTypeFactory::CaseInsensitive);

View File

@ -8,6 +8,7 @@
#include <Parsers/formatAST.h>
#include <Common/renameat2.h>
#include <Storages/StorageMaterializedView.h>
#include <Interpreters/Context.h>
#include <filesystem>

View File

@ -1,7 +1,6 @@
#pragma once
#include <Databases/DatabaseOnDisk.h>
#include <Interpreters/Context.h>
#include <Parsers/ASTCreateQuery.h>
@ -10,6 +9,7 @@ namespace DB
class DatabaseLazyIterator;
class Context;
/** Lazy engine of databases.
* Works like DatabaseOrdinary, but stores in memory only cache.

View File

@ -5,6 +5,7 @@
#include <Parsers/ASTCreateQuery.h>
#include <Storages/IStorage.h>
#include <Poco/File.h>
#include <filesystem>
namespace DB
@ -84,4 +85,10 @@ UUID DatabaseMemory::tryGetTableUUID(const String & table_name) const
return UUIDHelpers::Nil;
}
void DatabaseMemory::drop(const Context & context)
{
/// Remove data on explicit DROP DATABASE
std::filesystem::remove_all(context.getPath() + data_path);
}
}

View File

@ -46,6 +46,8 @@ public:
UUID tryGetTableUUID(const String & table_name) const override;
void drop(const Context & context) override;
private:
String data_path;
using NameToASTCreate = std::unordered_map<String, ASTPtr>;

View File

@ -5,14 +5,16 @@
#include <mysqlxx/Pool.h>
#include <Databases/DatabasesCommon.h>
#include <Interpreters/Context.h>
#include <memory>
#include <Parsers/ASTCreateQuery.h>
#include <Common/ThreadPool.h>
namespace DB
{
class Context;
/** Real-time access to table list and table structure from remote MySQL
* It doesn't make any manipulations with filesystem.
* All tables are created by calling code after real-time pull-out structure from remote MySQL

View File

@ -3,7 +3,6 @@
#include <Common/escapeForFileName.h>
#include <Common/quoteString.h>
#include <Databases/DatabasesCommon.h>
#include <Interpreters/Context.h>
#include <Parsers/ASTCreateQuery.h>
#include <Storages/IStorage.h>
@ -11,6 +10,8 @@
namespace DB
{
class Context;
std::pair<String, StoragePtr> createTableFromAST(
ASTCreateQuery ast_create_query,
const String & database_name,

View File

@ -5,6 +5,9 @@
namespace DB
{
class Context;
class ExternalDictionariesLoader;
class DatabaseWithDictionaries : public DatabaseOnDisk
{

View File

@ -5,6 +5,7 @@
#include <Common/escapeForFileName.h>
#include <Common/quoteString.h>
#include <common/logger_useful.h>
#include <Interpreters/Context.h>
#include <set>

View File

@ -1,6 +1,5 @@
#pragma once
#include <Interpreters/Context.h>
#include <Disks/DiskFactory.h>
#include <Disks/IDisk.h>
@ -10,6 +9,7 @@
namespace DB
{
class Context;
class DiskSelector;
using DiskSelectorPtr = std::shared_ptr<const DiskSelector>;

View File

@ -7,6 +7,11 @@
#include <Poco/Net/HTTPResponse.h>
#include <common/logger_useful.h>
namespace DB::ErrorCodes
{
extern const int BAD_ARGUMENTS;
}
namespace DB::S3
{
ProxyResolverConfiguration::ProxyResolverConfiguration(const Poco::URI & endpoint_, String proxy_scheme_, unsigned proxy_port_)
@ -30,13 +35,16 @@ Aws::Client::ClientConfigurationPerRequest ProxyResolverConfiguration::getConfig
Aws::Client::ClientConfigurationPerRequest cfg;
try
{
/// It should be just empty GET / request.
Poco::Net::HTTPRequest request(Poco::Net::HTTPRequest::HTTP_1_1);
/// It should be just empty GET request.
Poco::Net::HTTPRequest request(Poco::Net::HTTPRequest::HTTP_GET, endpoint.getPath(), Poco::Net::HTTPRequest::HTTP_1_1);
session->sendRequest(request);
Poco::Net::HTTPResponse response;
auto & response_body_stream = session->receiveResponse(response);
if (response.getStatus() != Poco::Net::HTTPResponse::HTTP_OK)
throw Exception("Proxy resolver returned not OK status: " + response.getReason(), ErrorCodes::BAD_ARGUMENTS);
String proxy_host;
/// Read proxy host as string from response body.
Poco::StreamCopier::copyToString(response_body_stream, proxy_host);

View File

@ -6,7 +6,7 @@ namespace DB::S3
{
/**
* Proxy configuration where proxy host is obtained each time from specified endpoint.
* For each request to S3 it makes GET request to specified endpoint and reads proxy host from a response body.
* For each request to S3 it makes GET request to specified endpoint URL and reads proxy host from a response body.
* Specified scheme and port added to obtained proxy host to form completed proxy URL.
*/
class ProxyResolverConfiguration : public ProxyConfiguration

View File

@ -37,13 +37,14 @@ namespace
void checkRemoveAccess(IDisk & disk) { disk.remove("test_acl"); }
std::shared_ptr<S3::ProxyResolverConfiguration> getProxyResolverConfiguration(const Poco::Util::AbstractConfiguration * proxy_resolver_config)
std::shared_ptr<S3::ProxyResolverConfiguration> getProxyResolverConfiguration(
const String & prefix, const Poco::Util::AbstractConfiguration & proxy_resolver_config)
{
auto endpoint = Poco::URI(proxy_resolver_config->getString("endpoint"));
auto proxy_scheme = proxy_resolver_config->getString("proxy_scheme");
auto endpoint = Poco::URI(proxy_resolver_config.getString(prefix + ".endpoint"));
auto proxy_scheme = proxy_resolver_config.getString(prefix + ".proxy_scheme");
if (proxy_scheme != "http" && proxy_scheme != "https")
throw Exception("Only HTTP/HTTPS schemas allowed in proxy resolver config: " + proxy_scheme, ErrorCodes::BAD_ARGUMENTS);
auto proxy_port = proxy_resolver_config->getUInt("proxy_port");
auto proxy_port = proxy_resolver_config.getUInt(prefix + ".proxy_port");
LOG_DEBUG(
&Logger::get("DiskS3"), "Configured proxy resolver: " << endpoint.toString() << ", Scheme: " << proxy_scheme << ", Port: " << proxy_port);
@ -51,16 +52,17 @@ namespace
return std::make_shared<S3::ProxyResolverConfiguration>(endpoint, proxy_scheme, proxy_port);
}
std::shared_ptr<S3::ProxyListConfiguration> getProxyListConfiguration(const Poco::Util::AbstractConfiguration * proxy_config)
std::shared_ptr<S3::ProxyListConfiguration> getProxyListConfiguration(
const String & prefix, const Poco::Util::AbstractConfiguration & proxy_config)
{
std::vector<String> keys;
proxy_config->keys(keys);
proxy_config.keys(prefix, keys);
std::vector<Poco::URI> proxies;
for (const auto & key : keys)
if (startsWith(key, "uri"))
{
Poco::URI proxy_uri(proxy_config->getString(key));
Poco::URI proxy_uri(proxy_config.getString(prefix + "." + key));
if (proxy_uri.getScheme() != "http" && proxy_uri.getScheme() != "https")
throw Exception("Only HTTP/HTTPS schemas allowed in proxy uri: " + proxy_uri.toString(), ErrorCodes::BAD_ARGUMENTS);
@ -78,25 +80,23 @@ namespace
return nullptr;
}
std::shared_ptr<S3::ProxyConfiguration> getProxyConfiguration(const Poco::Util::AbstractConfiguration * config)
std::shared_ptr<S3::ProxyConfiguration> getProxyConfiguration(const String & prefix, const Poco::Util::AbstractConfiguration & config)
{
if (!config->has("proxy"))
if (!config.has(prefix + ".proxy"))
return nullptr;
const auto * proxy_config = config->createView("proxy");
std::vector<String> config_keys;
proxy_config->keys(config_keys);
config.keys(prefix + ".proxy", config_keys);
if (auto resolver_configs = std::count(config_keys.begin(), config_keys.end(), "resolver"))
{
if (resolver_configs > 1)
throw Exception("Multiple proxy resolver configurations aren't allowed", ErrorCodes::BAD_ARGUMENTS);
return getProxyResolverConfiguration(proxy_config->createView("resolver"));
return getProxyResolverConfiguration(prefix + ".proxy.resolver", config);
}
return getProxyListConfiguration(proxy_config);
return getProxyListConfiguration(prefix + ".proxy", config);
}
}
@ -107,27 +107,25 @@ void registerDiskS3(DiskFactory & factory)
const Poco::Util::AbstractConfiguration & config,
const String & config_prefix,
const Context & context) -> DiskPtr {
const auto * disk_config = config.createView(config_prefix);
Poco::File disk{context.getPath() + "disks/" + name};
disk.createDirectories();
Aws::Client::ClientConfiguration cfg;
S3::URI uri(Poco::URI(disk_config->getString("endpoint")));
S3::URI uri(Poco::URI(config.getString(config_prefix + ".endpoint")));
if (uri.key.back() != '/')
throw Exception("S3 path must ends with '/', but '" + uri.key + "' doesn't.", ErrorCodes::BAD_ARGUMENTS);
cfg.endpointOverride = uri.endpoint;
auto proxy_config = getProxyConfiguration(disk_config);
auto proxy_config = getProxyConfiguration(config_prefix, config);
if (proxy_config)
cfg.perRequestConfiguration = [proxy_config](const auto & request) { return proxy_config->getConfiguration(request); };
auto client = S3::ClientFactory::instance().create(
cfg,
disk_config->getString("access_key_id", ""),
disk_config->getString("secret_access_key", ""));
config.getString(config_prefix + ".access_key_id", ""),
config.getString(config_prefix + ".secret_access_key", ""));
String metadata_path = context.getPath() + "disks/" + name + "/";

View File

@ -18,6 +18,7 @@
#include <DataTypes/DataTypesNumber.h>
#include <IO/WriteHelpers.h>
#include <Interpreters/ExpressionActions.h>
#include <Interpreters/Context.h>
#include <string>
#include <memory>
@ -44,7 +45,7 @@ namespace
{
template <typename Polygon, typename PointInPolygonImpl>
ColumnPtr callPointInPolygonImplWithPool(const IColumn & x, const IColumn & y, Polygon & polygon)
UInt8 callPointInPolygonImplWithPool(Float64 x, Float64 y, Polygon & polygon)
{
using Pool = ObjectPoolMap<PointInPolygonImpl, std::string>;
/// C++11 has thread-safe function-local statics on most modern compilers.
@ -63,19 +64,19 @@ ColumnPtr callPointInPolygonImplWithPool(const IColumn & x, const IColumn & y, P
std::string serialized_polygon = serialize(polygon);
auto impl = known_polygons.get(serialized_polygon, factory);
return pointInPolygon(x, y, *impl);
return impl->contains(x, y);
}
template <typename Polygon, typename PointInPolygonImpl>
ColumnPtr callPointInPolygonImpl(const IColumn & x, const IColumn & y, Polygon & polygon)
UInt8 callPointInPolygonImpl(Float64 x, Float64 y, Polygon & polygon)
{
PointInPolygonImpl impl(polygon);
return pointInPolygon(x, y, impl);
return impl.contains(x, y);
}
}
template <typename PointInPolygonImpl, bool use_object_pool>
template <typename PointInConstPolygonImpl, typename PointInNonConstPolygonImpl>
class FunctionPointInPolygon : public IFunction
{
public:
@ -91,7 +92,8 @@ public:
static FunctionPtr create(const Context & context)
{
return std::make_shared<FunctionPointInPolygon<PointInPolygonImpl, use_object_pool>>(context.getSettingsRef().validate_polygons);
return std::make_shared<FunctionPointInPolygon<PointInConstPolygonImpl, PointInNonConstPolygonImpl>>(
context.getSettingsRef().validate_polygons);
}
String getName() const override
@ -116,74 +118,192 @@ public:
throw Exception("Too few arguments", ErrorCodes::TOO_FEW_ARGUMENTS_FOR_FUNCTION);
}
auto get_message_prefix = [this](size_t i) { return "Argument " + toString(i + 1) + " for function " + getName(); };
for (size_t i = 1; i < arguments.size(); ++i)
auto validate_tuple = [this](size_t i, const DataTypeTuple * tuple)
{
const auto * array = checkAndGetDataType<DataTypeArray>(arguments[i].get());
if (array == nullptr && i != 1)
throw Exception(get_message_prefix(i) + " must be array of tuples.", ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
const auto * tuple = checkAndGetDataType<DataTypeTuple>(array ? array->getNestedType().get() : arguments[i].get());
if (tuple == nullptr)
throw Exception(get_message_prefix(i) + " must contains tuple.", ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
throw Exception(getMessagePrefix(i) + " must contain a tuple", ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
const DataTypes & elements = tuple->getElements();
if (elements.size() != 2)
throw Exception(get_message_prefix(i) + " must have exactly two elements.", ErrorCodes::BAD_ARGUMENTS);
throw Exception(getMessagePrefix(i) + " must have exactly two elements", ErrorCodes::BAD_ARGUMENTS);
for (auto j : ext::range(0, elements.size()))
{
if (!isNativeNumber(elements[j]))
{
throw Exception(get_message_prefix(i) + " must contains numeric tuple at position " + toString(j + 1),
throw Exception(getMessagePrefix(i) + " must contain numeric tuple at position " + toString(j + 1),
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
}
}
};
validate_tuple(0, checkAndGetDataType<DataTypeTuple>(arguments[0].get()));
if (arguments.size() == 2)
{
const auto * array = checkAndGetDataType<DataTypeArray>(arguments[1].get());
if (array == nullptr)
throw Exception(getMessagePrefix(1) + " must contain an array of tuples or an array of arrays of tuples.",
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
const auto * nested_array = checkAndGetDataType<DataTypeArray>(array->getNestedType().get());
if (nested_array != nullptr)
{
array = nested_array;
}
validate_tuple(1, checkAndGetDataType<DataTypeTuple>(array->getNestedType().get()));
}
else
{
for (size_t i = 1; i < arguments.size(); i++)
{
const auto * array = checkAndGetDataType<DataTypeArray>(arguments[i].get());
if (array == nullptr)
throw Exception(getMessagePrefix(i) + " must contain an array of tuples",
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
validate_tuple(i, checkAndGetDataType<DataTypeTuple>(array->getNestedType().get()));
}
}
return std::make_shared<DataTypeUInt8>();
}
void executeImpl(Block & block, const ColumnNumbers & arguments, size_t result, size_t /*input_rows_count*/) override
void executeImpl(Block & block, const ColumnNumbers & arguments, size_t result, size_t input_rows_count) override
{
const IColumn * point_col = block.getByPosition(arguments[0]).column.get();
const auto * const_tuple_col = checkAndGetColumn<ColumnConst>(point_col);
if (const_tuple_col)
point_col = &const_tuple_col->getDataColumn();
const auto * tuple_col = checkAndGetColumn<ColumnTuple>(point_col);
const auto * tuple_col = checkAndGetColumn<ColumnTuple>(point_col);
if (!tuple_col)
throw Exception("First argument for function " + getName() + " must be constant array of tuples.",
ErrorCodes::ILLEGAL_COLUMN);
auto & result_column = block.safeGetByPosition(result).column;
const auto & tuple_columns = tuple_col->getColumns();
result_column = executeForType(*tuple_columns[0], *tuple_columns[1], block, arguments);
if (const_tuple_col)
const IColumn * poly_col = block.getByPosition(arguments[1]).column.get();
const auto * const_poly_col = checkAndGetColumn<ColumnConst>(poly_col);
bool point_is_const = const_tuple_col != nullptr;
bool poly_is_const = const_poly_col != nullptr;
auto call_impl = poly_is_const
? callPointInPolygonImplWithPool<Polygon, PointInConstPolygonImpl>
: callPointInPolygonImpl<Polygon, PointInNonConstPolygonImpl>;
size_t size = point_is_const && poly_is_const ? 1 : input_rows_count;
auto execution_result = ColumnVector<UInt8>::create(size);
auto & data = execution_result->getData();
Polygon polygon;
for (auto i : ext::range(0, size))
{
if (!poly_is_const || i == 0)
{
polygon = parsePolygon(block, arguments, i);
}
size_t point_index = point_is_const ? 0 : i;
data[i] = call_impl(tuple_columns[0]->getFloat64(point_index), tuple_columns[1]->getFloat64(point_index), polygon);
}
auto & result_column = block.safeGetByPosition(result).column;
result_column = std::move(execution_result);
if (point_is_const && poly_is_const)
result_column = ColumnConst::create(result_column, const_tuple_col->size());
}
private:
bool validate;
ColumnPtr executeForType(const IColumn & x, const IColumn & y, Block & block, const ColumnNumbers & arguments)
std::string getMessagePrefix(size_t i) const
{
return "Argument " + toString(i + 1) + " for function " + getName();
}
Polygon parsePolygonFromSingleColumn(Block & block, const ColumnNumbers & arguments, size_t i) const
{
const auto & poly = block.getByPosition(arguments[1]).column.get();
const auto * column_const = checkAndGetColumn<ColumnConst>(poly);
const auto * array_col =
column_const ? checkAndGetColumn<ColumnArray>(column_const->getDataColumn()) : checkAndGetColumn<ColumnArray>(poly);
if (!array_col)
throw Exception(getMessagePrefix(1) + " must contain an array of tuples or an array of arrays of tuples",
ErrorCodes::ILLEGAL_COLUMN);
const auto * nested_array_col = checkAndGetColumn<ColumnArray>(array_col->getData());
const auto & tuple_data = nested_array_col ? nested_array_col->getData() : array_col->getData();
const auto & tuple_col = checkAndGetColumn<ColumnTuple>(tuple_data);
if (!tuple_col)
throw Exception(getMessagePrefix(1) + " must contain an array of tuples or an array of arrays of tuples",
ErrorCodes::ILLEGAL_COLUMN);
const auto & tuple_columns = tuple_col->getColumns();
const auto & x_column = tuple_columns[0];
const auto & y_column = tuple_columns[1];
auto parse_polygon_part = [&x_column, &y_column](auto & container, size_t l, size_t r)
{
for (auto j : ext::range(l, r))
{
CoordinateType x_coord = x_column->getFloat64(j);
CoordinateType y_coord = y_column->getFloat64(j);
container.push_back(Point(x_coord, y_coord));
}
};
Polygon polygon;
if (nested_array_col)
{
for (auto j : ext::range(array_col->getOffsets()[i - 1], array_col->getOffsets()[i]))
{
size_t l = nested_array_col->getOffsets()[j - 1];
size_t r = nested_array_col->getOffsets()[j];
if (polygon.outer().empty())
{
parse_polygon_part(polygon.outer(), l, r);
}
else
{
polygon.inners().emplace_back();
parse_polygon_part(polygon.inners().back(), l, r);
}
}
}
else
{
size_t l = array_col->getOffsets()[i - 1];
size_t r = array_col->getOffsets()[i];
parse_polygon_part(polygon.outer(), l, r);
}
return polygon;
}
Polygon parsePolygonFromMultipleColumns(Block & block, const ColumnNumbers & arguments, size_t) const
{
Polygon polygon;
auto get_message_prefix = [this](size_t i) { return "Argument " + toString(i + 1) + " for function " + getName(); };
for (size_t i = 1; i < arguments.size(); ++i)
{
const auto * const_col = checkAndGetColumn<ColumnConst>(block.getByPosition(arguments[i]).column.get());
const auto * array_col = const_col ? checkAndGetColumn<ColumnArray>(&const_col->getDataColumn()) : nullptr;
if (!const_col)
throw Exception("Multi-argument version of function " + getName() + " works only with const polygon",
ErrorCodes::BAD_ARGUMENTS);
const auto * array_col = checkAndGetColumn<ColumnArray>(&const_col->getDataColumn());
const auto * tuple_col = array_col ? checkAndGetColumn<ColumnTuple>(&array_col->getData()) : nullptr;
if (!tuple_col)
throw Exception(get_message_prefix(i) + " must be constant array of tuples.", ErrorCodes::ILLEGAL_COLUMN);
throw Exception(getMessagePrefix(i) + " must be constant array of tuples", ErrorCodes::ILLEGAL_COLUMN);
const auto & tuple_columns = tuple_col->getColumns();
const auto & column_x = tuple_columns[0];
@ -197,7 +317,7 @@ private:
auto size = column_x->size();
if (size == 0)
throw Exception(get_message_prefix(i) + " shouldn't be empty.", ErrorCodes::ILLEGAL_COLUMN);
throw Exception(getMessagePrefix(i) + " shouldn't be empty.", ErrorCodes::ILLEGAL_COLUMN);
for (auto j : ext::range(0, size))
{
@ -207,6 +327,21 @@ private:
}
}
return polygon;
}
Polygon parsePolygon(Block & block, const ColumnNumbers & arguments, size_t i) const
{
Polygon polygon;
if (arguments.size() == 2)
{
polygon = parsePolygonFromSingleColumn(block, arguments, i);
}
else
{
polygon = parsePolygonFromMultipleColumns(block, arguments, i);
}
boost::geometry::correct(polygon);
#if !defined(__clang_analyzer__) /// It does not like boost.
@ -218,19 +353,14 @@ private:
throw Exception("Polygon is not valid: " + failure_message, ErrorCodes::BAD_ARGUMENTS);
}
#endif
auto call_impl = use_object_pool
? callPointInPolygonImplWithPool<Polygon, PointInPolygonImpl>
: callPointInPolygonImpl<Polygon, PointInPolygonImpl>;
return call_impl(x, y, polygon);
return polygon;
}
};
void registerFunctionPointInPolygon(FunctionFactory & factory)
{
factory.registerFunction<FunctionPointInPolygon<PointInPolygonWithGrid<Float64>, true>>();
factory.registerFunction<FunctionPointInPolygon<PointInPolygonWithGrid<Float64>, PointInPolygonTrivial<Float64>>>();
}
}

View File

@ -14,6 +14,7 @@
#include <Poco/URI.h>
#include <Poco/Version.h>
#include <Common/DNSResolver.h>
#include <Common/RemoteHostFilter.h>
#include <common/logger_useful.h>
#include <Poco/URIStreamFactory.h>

View File

@ -32,6 +32,7 @@
#include <Parsers/ASTSubquery.h>
#include <Parsers/ASTTablesInSelectQuery.h>
#include <Interpreters/Context.h>
#include <Interpreters/ExpressionActions.h>
#include <Interpreters/misc.h>
#include <Interpreters/ActionsVisitor.h>

View File

@ -1,6 +1,7 @@
#include <Interpreters/AsynchronousMetrics.h>
#include <Interpreters/ExpressionJIT.h>
#include <Interpreters/DatabaseCatalog.h>
#include <Interpreters/Context.h>
#include <Common/Exception.h>
#include <Common/setThreadName.h>
#include <Common/CurrentMetrics.h>

View File

@ -2,11 +2,14 @@
#include <Core/QueryProcessingStage.h>
#include <Interpreters/ClusterProxy/IStreamFactory.h>
#include <Interpreters/StorageID.h>
#include <Storages/IStorage_fwd.h>
namespace DB
{
using Scalars = std::map<String, Block>;
namespace ClusterProxy
{

View File

@ -17,6 +17,7 @@
#include <Interpreters/executeQuery.h>
#include <Interpreters/Cluster.h>
#include <Interpreters/AddDefaultDatabaseVisitor.h>
#include <Interpreters/Context.h>
#include <Access/AccessRightsElement.h>
#include <Common/DNSResolver.h>
#include <Common/Macros.h>

View File

@ -1,5 +1,5 @@
#pragma once
#include <Interpreters/Context.h>
#include <Interpreters/Cluster.h>
#include <DataStreams/BlockIO.h>
#include <Common/CurrentThread.h>
@ -13,9 +13,15 @@
#include <mutex>
#include <thread>
namespace zkutil
{
class ZooKeeper;
}
namespace DB
{
class Context;
class ASTAlterQuery;
class AccessRightsElements;
struct DDLLogEntry;

View File

@ -4,6 +4,7 @@
#include <Interpreters/ExpressionActions.h>
#include <Interpreters/ExpressionJIT.h>
#include <Interpreters/TableJoin.h>
#include <Interpreters/Context.h>
#include <Columns/ColumnsNumber.h>
#include <Common/typeid_cast.h>
#include <DataTypes/DataTypeArray.h>
@ -509,6 +510,33 @@ std::string ExpressionAction::toString() const
return ss.str();
}
ExpressionActions::ExpressionActions(const NamesAndTypesList & input_columns_, const Context & context_)
: input_columns(input_columns_), settings(context_.getSettingsRef())
{
for (const auto & input_elem : input_columns)
sample_block.insert(ColumnWithTypeAndName(nullptr, input_elem.type, input_elem.name));
#if USE_EMBEDDED_COMPILER
compilation_cache = context_.getCompiledExpressionCache();
#endif
}
/// For constant columns the columns themselves can be contained in `input_columns_`.
ExpressionActions::ExpressionActions(const ColumnsWithTypeAndName & input_columns_, const Context & context_)
: settings(context_.getSettingsRef())
{
for (const auto & input_elem : input_columns_)
{
input_columns.emplace_back(input_elem.name, input_elem.type);
sample_block.insert(input_elem);
}
#if USE_EMBEDDED_COMPILER
compilation_cache = context_.getCompiledExpressionCache();
#endif
}
ExpressionActions::~ExpressionActions() = default;
void ExpressionActions::checkLimits(Block & block) const
{
if (settings.max_temporary_columns && block.columns() > settings.max_temporary_columns)

View File

@ -4,7 +4,6 @@
#include <Core/ColumnWithTypeAndName.h>
#include <Core/Names.h>
#include <Core/Settings.h>
#include <Interpreters/Context.h>
#include <Common/SipHash.h>
#include <Common/UInt128.h>
#include <unordered_map>
@ -25,6 +24,7 @@ namespace ErrorCodes
extern const int LOGICAL_ERROR;
}
class Context;
class TableJoin;
class IJoin;
using JoinPtr = std::shared_ptr<IJoin>;
@ -42,6 +42,7 @@ class IDataType;
using DataTypePtr = std::shared_ptr<const IDataType>;
class ExpressionActions;
class CompiledExpressionCache;
/** Action on the block.
*/
@ -155,30 +156,12 @@ class ExpressionActions
public:
using Actions = std::vector<ExpressionAction>;
ExpressionActions(const NamesAndTypesList & input_columns_, const Context & context_)
: input_columns(input_columns_), settings(context_.getSettingsRef())
{
for (const auto & input_elem : input_columns)
sample_block.insert(ColumnWithTypeAndName(nullptr, input_elem.type, input_elem.name));
#if USE_EMBEDDED_COMPILER
compilation_cache = context_.getCompiledExpressionCache();
#endif
}
ExpressionActions(const NamesAndTypesList & input_columns_, const Context & context_);
/// For constant columns the columns themselves can be contained in `input_columns_`.
ExpressionActions(const ColumnsWithTypeAndName & input_columns_, const Context & context_)
: settings(context_.getSettingsRef())
{
for (const auto & input_elem : input_columns_)
{
input_columns.emplace_back(input_elem.name, input_elem.type);
sample_block.insert(input_elem);
}
#if USE_EMBEDDED_COMPILER
compilation_cache = context_.getCompiledExpressionCache();
#endif
}
ExpressionActions(const ColumnsWithTypeAndName & input_columns_, const Context & context_);
~ExpressionActions();
/// Add the input column.
/// The name of the column must not match the names of the intermediate columns that occur when evaluating the expression.

View File

@ -32,6 +32,7 @@
#include <Interpreters/HashJoin.h>
#include <Interpreters/MergeJoin.h>
#include <Interpreters/DictionaryReader.h>
#include <Interpreters/Context.h>
#include <AggregateFunctions/AggregateFunctionFactory.h>
#include <AggregateFunctions/parseAggregateFunctionParameters.h>

View File

@ -7,7 +7,6 @@
#include <Parsers/ASTTablesInSelectQuery.h>
#include <Parsers/ASTSelectQuery.h>
#include <Parsers/ASTIdentifier.h>
#include <Interpreters/Context.h>
#include <Interpreters/interpretSubquery.h>
#include <Common/typeid_cast.h>
#include <Core/Block.h>
@ -17,6 +16,7 @@
#include <IO/WriteHelpers.h>
#include <Interpreters/InDepthNodeVisitor.h>
#include <Interpreters/IdentifierSemantic.h>
#include <Interpreters/Context.h>
namespace DB
{

View File

@ -7,6 +7,7 @@
#include <Interpreters/IdentifierSemantic.h>
#include <Interpreters/getTableExpressions.h>
#include <Interpreters/InterpreterSelectWithUnionQuery.h>
#include <Interpreters/Context.h>
#include <Parsers/DumpASTNode.h>
#include <Parsers/ASTSubquery.h>
#include <Parsers/queryToString.h>

View File

@ -1,6 +1,5 @@
#pragma once
#include <Interpreters/Context.h>
#include <Interpreters/IInterpreter.h>
#include <Parsers/IAST_fwd.h>
@ -8,6 +7,8 @@
namespace DB
{
class Context;
/// Returns single row with explain results
class InterpreterExplainQuery : public IInterpreter
{

View File

@ -61,6 +61,7 @@
#include <Interpreters/InterpreterUseQuery.h>
#include <Interpreters/InterpreterWatchQuery.h>
#include <Interpreters/InterpreterGrantQuery.h>
#include <Interpreters/Context.h>
#include <Parsers/ASTSystemQuery.h>

View File

@ -2,13 +2,14 @@
#include <DataStreams/IBlockOutputStream.h>
#include <DataStreams/BlockIO.h>
#include <Interpreters/Context.h>
#include <Interpreters/IInterpreter.h>
#include <Parsers/ASTInsertQuery.h>
namespace DB
{
class Context;
/** Interprets the INSERT query.
*/

View File

@ -5,7 +5,6 @@
#include <Core/QueryProcessingStage.h>
#include <Parsers/ASTSelectQuery.h>
#include <DataStreams/IBlockStream_fwd.h>
#include <Interpreters/Context.h>
#include <Interpreters/ExpressionActions.h>
#include <Interpreters/ExpressionAnalyzer.h>
#include <Interpreters/IInterpreter.h>
@ -25,6 +24,7 @@ namespace DB
struct SubqueryForSet;
class InterpreterSelectWithUnionQuery;
class Context;
struct SyntaxAnalyzerResult;
using SyntaxAnalyzerResultPtr = std::shared_ptr<const SyntaxAnalyzerResult>;

View File

@ -1,5 +1,6 @@
#include <Interpreters/InterpreterSelectWithUnionQuery.h>
#include <Interpreters/InterpreterSelectQuery.h>
#include <Interpreters/Context.h>
#include <Parsers/ASTSelectWithUnionQuery.h>
#include <Parsers/ASTSelectQuery.h>
#include <DataStreams/UnionBlockInputStream.h>

View File

@ -1,15 +1,16 @@
#pragma once
#include <Core/QueryProcessingStage.h>
#include <Interpreters/Context.h>
#include <Interpreters/IInterpreter.h>
#include <Interpreters/SelectQueryOptions.h>
#include <Parsers/IAST_fwd.h>
#include <Processors/QueryPipeline.h>
namespace DB
{
class Context;
class InterpreterSelectQuery;

View File

@ -6,6 +6,8 @@
namespace DB
{
class Context;
class ASTSetRoleQuery;
struct ExtendedRoleSet;
struct User;

View File

@ -13,6 +13,7 @@ limitations under the License. */
#include <Common/typeid_cast.h>
#include <Parsers/ASTWatchQuery.h>
#include <Interpreters/InterpreterWatchQuery.h>
#include <Interpreters/Context.h>
#include <Access/AccessFlags.h>
#include <DataStreams/IBlockInputStream.h>
#include <DataStreams/OneBlockInputStream.h>

View File

@ -18,11 +18,11 @@ limitations under the License. */
#include <Interpreters/IInterpreter.h>
#include <Storages/SelectQueryInfo.h>
#include <Storages/IStorage.h>
#include <Interpreters/Context.h>
namespace DB
{
class Context;
class IAST;
using ASTPtr = std::shared_ptr<IAST>;
using StoragePtr = std::shared_ptr<IStorage>;

View File

@ -1,6 +1,5 @@
#include <Interpreters/JoinedTables.h>
#include <Interpreters/TableJoin.h>
#include <Interpreters/Context.h>
#include <Interpreters/getTableExpressions.h>
#include <Interpreters/InJoinSubqueriesPreprocessor.h>
#include <Interpreters/IdentifierSemantic.h>

View File

@ -3,13 +3,14 @@
#include <Core/NamesAndTypes.h>
#include <Interpreters/DatabaseAndTableWithAlias.h>
#include <Interpreters/InterpreterSelectWithUnionQuery.h>
#include <Interpreters/Context.h>
#include <Interpreters/StorageID.h>
#include <Storages/IStorage_fwd.h>
namespace DB
{
class ASTSelectQuery;
class Context;
class TableJoin;
struct SelectQueryOptions;

View File

@ -4,6 +4,7 @@
#include <Interpreters/ExpressionActions.h>
#include <Interpreters/ExpressionAnalyzer.h>
#include <Interpreters/InterpreterSelectQuery.h>
#include <Interpreters/Context.h>
#include <Storages/IStorage_fwd.h>
#include <Storages/MutationCommands.h>

View File

@ -10,6 +10,7 @@
#include <Storages/MergeTree/IMergeTreeDataPart.h>
#include <Storages/MergeTree/MergeTreeData.h>
#include <Interpreters/PartLog.h>
#include <Interpreters/Context.h>
namespace DB

View File

@ -1,6 +1,7 @@
#pragma once
#include <Interpreters/SystemLog.h>
#include <Interpreters/ClientInfo.h>
#include <Core/SettingsCollection.h>

View File

@ -1,6 +1,7 @@
#pragma once
#include <Interpreters/SystemLog.h>
#include <Interpreters/ClientInfo.h>
namespace ProfileEvents

View File

@ -23,6 +23,7 @@
#include <Interpreters/evaluateConstantExpression.h>
#include <Interpreters/NullableUtils.h>
#include <Interpreters/sortBlock.h>
#include <Interpreters/Context.h>
#include <Storages/MergeTree/KeyCondition.h>

View File

@ -5,7 +5,6 @@
#include <DataStreams/SizeLimits.h>
#include <DataTypes/IDataType.h>
#include <Interpreters/SetVariants.h>
#include <Interpreters/Context.h>
#include <Parsers/IAST.h>
#include <Storages/MergeTree/BoolMask.h>
@ -17,6 +16,7 @@ namespace DB
struct Range;
class Context;
class IFunctionBase;
using FunctionBasePtr = std::shared_ptr<IFunctionBase>;

View File

@ -11,7 +11,6 @@
#include <Core/Types.h>
#include <Core/Defines.h>
#include <Storages/IStorage.h>
#include <Interpreters/Context.h>
#include <Common/Stopwatch.h>
#include <Parsers/ASTCreateQuery.h>
#include <Parsers/parseQuery.h>
@ -65,6 +64,12 @@ namespace ErrorCodes
class Context;
class QueryLog;
class QueryThreadLog;
class PartLog;
class TextLog;
class TraceLog;
class MetricLog;
class ISystemLog

View File

@ -31,6 +31,7 @@
#include <Interpreters/InterpreterSetQuery.h>
#include <Interpreters/ReplaceQueryParameterVisitor.h>
#include <Interpreters/executeQuery.h>
#include <Interpreters/Context.h>
#include <Common/ProfileEvents.h>
#include <Interpreters/DNSCacheUpdater.h>

View File

@ -1,4 +1,5 @@
#include <Interpreters/getTableExpressions.h>
#include <Interpreters/Context.h>
#include <Interpreters/InterpreterSelectWithUnionQuery.h>
#include <Parsers/ASTTablesInSelectQuery.h>
#include <Parsers/ASTSelectQuery.h>

View File

@ -12,6 +12,7 @@
#include <Interpreters/interpretSubquery.h>
#include <Interpreters/DatabaseAndTableWithAlias.h>
#include <Interpreters/Context.h>
namespace DB
{

View File

@ -8,6 +8,8 @@
#include <Interpreters/ExpressionAnalyzer.h>
#include <Interpreters/ReplaceQueryParameterVisitor.h>
#include <Interpreters/SyntaxAnalyzer.h>
#include <Interpreters/Context.h>
#include <Interpreters/convertFieldToType.h>
#include <IO/ReadHelpers.h>
#include <Parsers/ASTExpressionList.h>
#include <Parsers/ASTFunction.h>
@ -17,7 +19,6 @@
#include <Parsers/CommonParsers.h>
#include <Processors/Formats/Impl/ConstantExpressionTemplate.h>
#include <Parsers/ExpressionElementParsers.h>
#include <Interpreters/convertFieldToType.h>
#include <boost/functional/hash.hpp>

View File

@ -81,13 +81,13 @@ StorageDistributedDirectoryMonitor::StorageDistributedDirectoryMonitor(
: storage(storage_)
, pool(std::move(pool_))
, path{path_ + '/'}
, should_batch_inserts(storage.global_context.getSettingsRef().distributed_directory_monitor_batch_inserts)
, min_batched_block_size_rows(storage.global_context.getSettingsRef().min_insert_block_size_rows)
, min_batched_block_size_bytes(storage.global_context.getSettingsRef().min_insert_block_size_bytes)
, should_batch_inserts(storage.global_context->getSettingsRef().distributed_directory_monitor_batch_inserts)
, min_batched_block_size_rows(storage.global_context->getSettingsRef().min_insert_block_size_rows)
, min_batched_block_size_bytes(storage.global_context->getSettingsRef().min_insert_block_size_bytes)
, current_batch_file_path{path + "current_batch.txt"}
, default_sleep_time{storage.global_context.getSettingsRef().distributed_directory_monitor_sleep_time_ms.totalMilliseconds()}
, default_sleep_time{storage.global_context->getSettingsRef().distributed_directory_monitor_sleep_time_ms.totalMilliseconds()}
, sleep_time{default_sleep_time}
, max_sleep_time{storage.global_context.getSettingsRef().distributed_directory_monitor_max_sleep_time_ms.totalMilliseconds()}
, max_sleep_time{storage.global_context->getSettingsRef().distributed_directory_monitor_max_sleep_time_ms.totalMilliseconds()}
, log{&Logger::get(getLoggerName())}
, monitor_blocker(monitor_blocker_)
, bg_pool(bg_pool_)
@ -214,7 +214,7 @@ ConnectionPoolPtr StorageDistributedDirectoryMonitor::createPool(const std::stri
auto pools = createPoolsForAddresses(name, pool_factory);
const auto settings = storage.global_context.getSettings();
const auto settings = storage.global_context->getSettings();
return pools.size() == 1 ? pools.front() : std::make_shared<ConnectionPoolWithFailover>(pools,
settings.load_balancing,
settings.distributed_replica_error_half_life.totalSeconds(),
@ -262,7 +262,7 @@ bool StorageDistributedDirectoryMonitor::processFiles()
void StorageDistributedDirectoryMonitor::processFile(const std::string & file_path)
{
LOG_TRACE(log, "Started processing `" << file_path << '`');
auto timeouts = ConnectionTimeouts::getTCPTimeoutsWithFailover(storage.global_context.getSettingsRef());
auto timeouts = ConnectionTimeouts::getTCPTimeoutsWithFailover(storage.global_context->getSettingsRef());
auto connection = pool->get(timeouts);
try
@ -437,7 +437,7 @@ struct StorageDistributedDirectoryMonitor::Batch
Poco::File{tmp_file}.renameTo(parent.current_batch_file_path);
}
auto timeouts = ConnectionTimeouts::getTCPTimeoutsWithFailover(parent.storage.global_context.getSettingsRef());
auto timeouts = ConnectionTimeouts::getTCPTimeoutsWithFailover(parent.storage.global_context->getSettingsRef());
auto connection = parent.pool->get(timeouts);
bool batch_broken = false;

View File

@ -17,6 +17,7 @@
#include <Interpreters/InterpreterInsertQuery.h>
#include <Interpreters/createBlockSelector.h>
#include <Interpreters/ExpressionActions.h>
#include <Interpreters/Context.h>
#include <DataTypes/DataTypesNumber.h>
#include <DataTypes/DataTypeLowCardinality.h>

View File

@ -11,7 +11,6 @@
#include <chrono>
#include <optional>
#include <Interpreters/Cluster.h>
#include <Interpreters/Context.h>
namespace Poco
@ -22,6 +21,7 @@ namespace Poco
namespace DB
{
class Context;
class StorageDistributed;
/** If insert_sync_ is true, the write is synchronous. Uses insert_timeout_ if it is not zero.

View File

@ -36,6 +36,7 @@ using VolumePtr = std::shared_ptr<IVolume>;
class IMergeTreeReader;
class IMergeTreeDataPartWriter;
class MarkCache;
namespace ErrorCodes
{
@ -319,7 +320,7 @@ protected:
/// checksums.txt and columns.txt. 0 - if not counted;
UInt64 bytes_on_disk{0};
/// Columns description. Cannot be changed, after part initialiation.
/// Columns description. Cannot be changed, after part initialization.
NamesAndTypesList columns;
const Type part_type;
@ -352,7 +353,7 @@ private:
/// For the older format version calculates rows count from the size of a column with a fixed size.
void loadRowsCount();
/// Loads ttl infos in json format from file ttl.txt. If file doesn`t exists assigns ttl infos with all zeros
/// Loads ttl infos in json format from file ttl.txt. If file doesn't exists assigns ttl infos with all zeros
void loadTTLInfos();
void loadPartitionAndMinMaxIndex();

View File

@ -136,7 +136,7 @@ protected:
size_t next_mark = 0;
size_t next_index_offset = 0;
/// Number of marsk in data from which skip indices have to start
/// Number of marks in data from which skip indices have to start
/// aggregation. I.e. it's data mark number, not skip indices mark.
size_t skip_index_data_mark = 0;

View File

@ -3,7 +3,6 @@
#include <sstream>
#include <optional>
#include <Interpreters/Context.h>
#include <Interpreters/Set.h>
#include <Core/SortDescription.h>
#include <Parsers/ASTExpressionList.h>
@ -15,10 +14,9 @@
namespace DB
{
class Context;
class IFunction;
using FunctionBasePtr = std::shared_ptr<IFunctionBase>;
class ExpressionActions;
using ExpressionActionsPtr = std::shared_ptr<ExpressionActions>;

View File

@ -3,7 +3,6 @@
#include <Common/Stopwatch.h>
#include <Common/CurrentMetrics.h>
#include <Common/MemoryTracker.h>
#include <Interpreters/Context.h>
#include <Storages/MergeTree/MergeTreeData.h>
#include <memory>
#include <list>

View File

@ -18,6 +18,7 @@
#include <Interpreters/ExpressionAnalyzer.h>
#include <Interpreters/PartLog.h>
#include <Interpreters/SyntaxAnalyzer.h>
#include <Interpreters/Context.h>
#include <Parsers/ASTFunction.h>
#include <Parsers/ASTLiteral.h>
#include <Parsers/ASTNameTypePair.h>

View File

@ -1,7 +1,7 @@
#pragma once
#include <Common/SimpleIncrement.h>
#include <Interpreters/Context.h>
#include <Common/MultiVersion.h>
#include <Storages/IStorage.h>
#include <Storages/MergeTree/MergeTreeIndices.h>
#include <Storages/MergeTree/MergeTreePartInfo.h>
@ -34,6 +34,7 @@ class MergeListEntry;
class AlterCommands;
class MergeTreePartsMover;
class MutationCommands;
class Context;
class ExpressionActions;
using ExpressionActionsPtr = std::shared_ptr<ExpressionActions>;
@ -70,7 +71,7 @@ namespace ErrorCodes
/// [Column].mrk - marks, pointing to seek positions allowing to skip n * k rows.
///
/// File structure of tables with custom partitioning (format_version >= 1):
/// Part directory - / partiiton-id _ min-id _ max-id _ level /
/// Part directory - / partition-id _ min-id _ max-id _ level /
/// Inside the part directory:
/// The same files as for month-partitioned tables, plus
/// count.txt - contains total number of rows in this part.

View File

@ -26,6 +26,7 @@
#include <Processors/Transforms/MaterializingTransform.h>
#include <Processors/Executors/TreeExecutorBlockInputStream.h>
#include <Interpreters/MutationsInterpreter.h>
#include <Interpreters/Context.h>
#include <Common/SimpleIncrement.h>
#include <Common/interpolate.h>
#include <Common/typeid_cast.h>

View File

@ -160,7 +160,7 @@ private:
NamesAndTypesList storage_columns,
const MutationCommands & commands_for_removes);
/// Get skip indcies, that should exists in the resulting data part.
/// Get skip indices, that should exists in the resulting data part.
static MergeTreeIndices getIndicesForNewDataPart(
const MergeTreeIndices & all_indices,
const MutationCommands & commands_for_removes);

View File

@ -6,13 +6,13 @@ namespace DB
{
/** In compact format all columns are stored in one file (`data.bin`).
* Data is splitted in granules and columns are serialized sequentially in one granule.
* Data is split in granules and columns are serialized sequentially in one granule.
* Granules are written one by one in data file.
* Marks are also stored in single file (`data.mrk3`).
* In compact format one mark is an array of marks for every column and a number of rows in granule.
* Format of other data part files is not changed.
* It's considered to store only small parts in compact format (up to 10M).
* NOTE: Compact parts aren't supported for tables with non-adaptive granularty.
* NOTE: Compact parts aren't supported for tables with non-adaptive granularity.
* NOTE: In compact part compressed and uncompressed size of single column is unknown.
*/
class MergeTreeDataPartCompact : public IMergeTreeDataPart

View File

@ -1,4 +1,5 @@
#include <Storages/MergeTree/MergeTreeDataPartWriterWide.h>
#include <Interpreters/Context.h>
namespace DB
{

View File

@ -18,6 +18,7 @@
#include <Parsers/ASTFunction.h>
#include <Parsers/ASTSampleRatio.h>
#include <Interpreters/ExpressionAnalyzer.h>
#include <Interpreters/Context.h>
/// Allow to use __uint128_t as a template parameter for boost::rational.
// https://stackoverflow.com/questions/41198673/uint128-t-not-working-with-clang-and-libstdc

View File

@ -5,6 +5,7 @@
#include <Common/Exception.h>
#include <Disks/createVolume.h>
#include <Interpreters/AggregationCommon.h>
#include <Interpreters/Context.h>
#include <IO/HashingWriteBuffer.h>
#include <DataTypes/DataTypeDateTime.h>
#include <DataTypes/DataTypeDate.h>

View File

@ -9,7 +9,6 @@
#include <Columns/ColumnsNumber.h>
#include <Interpreters/sortBlock.h>
#include <Interpreters/Context.h>
#include <Storages/MergeTree/MergeTreeData.h>

View File

@ -1,6 +1,7 @@
#include <Storages/MergeTree/MergeTreeReverseSelectProcessor.h>
#include <Storages/MergeTree/MergeTreeBaseSelectProcessor.h>
#include <Storages/MergeTree/IMergeTreeReader.h>
#include <Interpreters/Context.h>
namespace DB

Some files were not shown because too many files have changed in this diff Show More