Merge branch 'master' into better_data_part_storage_builder

This commit is contained in:
alesapin 2022-06-29 12:33:42 +02:00
commit c80a4c27be
129 changed files with 1219 additions and 190 deletions

View File

@ -643,7 +643,7 @@ jobs:
# shellcheck disable=SC2046
docker rm -f $(docker ps -a -q) ||:
sudo rm -fr "$TEMP_PATH" "$CACHES_PATH"
BuilderBinTidy:
BuilderBinClangTidy:
needs: [DockerHubPush]
runs-on: [self-hosted, builder]
steps:
@ -1011,7 +1011,7 @@ jobs:
- BuilderBinFreeBSD
# - BuilderBinGCC
- BuilderBinPPC64
- BuilderBinTidy
- BuilderBinClangTidy
- BuilderDebSplitted
runs-on: [self-hosted, style-checker]
steps:

View File

@ -707,7 +707,7 @@ jobs:
# shellcheck disable=SC2046
docker rm -f $(docker ps -a -q) ||:
sudo rm -fr "$TEMP_PATH" "$CACHES_PATH"
BuilderBinTidy:
BuilderBinClangTidy:
needs: [DockerHubPush, FastTest]
runs-on: [self-hosted, builder]
steps:
@ -1065,7 +1065,7 @@ jobs:
- BuilderBinFreeBSD
# - BuilderBinGCC
- BuilderBinPPC64
- BuilderBinTidy
- BuilderBinClangTidy
- BuilderDebSplitted
runs-on: [self-hosted, style-checker]
if: ${{ success() || failure() }}

View File

@ -89,7 +89,7 @@ public:
inline void returnObject(T && object_to_return)
{
{
std::lock_guard<std::mutex> lock(objects_mutex);
std::lock_guard lock(objects_mutex);
objects.emplace_back(std::move(object_to_return));
--borrowed_objects_size;
@ -107,14 +107,14 @@ public:
/// Allocated objects size by the pool. If allocatedObjectsSize == maxSize then pool is full.
inline size_t allocatedObjectsSize() const
{
std::unique_lock<std::mutex> lock(objects_mutex);
std::lock_guard lock(objects_mutex);
return allocated_objects_size;
}
/// Returns allocatedObjectsSize == maxSize
inline bool isFull() const
{
std::unique_lock<std::mutex> lock(objects_mutex);
std::lock_guard lock(objects_mutex);
return allocated_objects_size == max_size;
}
@ -122,7 +122,7 @@ public:
/// Then client will wait during borrowObject function call.
inline size_t borrowedObjectsSize() const
{
std::unique_lock<std::mutex> lock(objects_mutex);
std::lock_guard lock(objects_mutex);
return borrowed_objects_size;
}

View File

@ -4,7 +4,7 @@
namespace Poco::Util
{
class LayeredConfiguration;
class LayeredConfiguration; // NOLINT(cppcoreguidelines-virtual-class-destructor)
}
/// Import extra command line arguments to configuration. These are command line arguments after --.

View File

@ -27,6 +27,6 @@ struct FreeingDeleter
}
};
typedef std::unique_ptr<char, FreeingDeleter> DemangleResult;
using DemangleResult = std::unique_ptr<char, FreeingDeleter>;
DemangleResult tryDemangle(const char * name);

View File

@ -23,10 +23,10 @@ public:
constexpr StrongTypedef(): t() {}
constexpr StrongTypedef(const Self &) = default;
constexpr StrongTypedef(Self &&) = default;
constexpr StrongTypedef(Self &&) noexcept(std::is_nothrow_move_constructible_v<T>) = default;
Self & operator=(const Self &) = default;
Self & operator=(Self &&) = default;
Self & operator=(Self &&) noexcept(std::is_nothrow_move_assignable_v<T>)= default;
template <class Enable = typename std::is_copy_assignable<T>::type>
Self & operator=(const T & rhs) { t = rhs; return *this;}

View File

@ -1,6 +1,6 @@
#pragma once
#include <time.h>
#include <ctime>
#if defined (OS_DARWIN) || defined (OS_SUNOS)
# define CLOCK_MONOTONIC_COARSE CLOCK_MONOTONIC

View File

@ -1,6 +1,6 @@
#pragma once
#include <string.h>
#include <cstring>
#include <type_traits>

View File

@ -27,6 +27,8 @@
#include <type_traits>
#include <initializer_list>
// NOLINTBEGIN(*)
namespace wide
{
template <size_t Bits, typename Signed>
@ -257,4 +259,7 @@ struct hash<wide::integer<Bits, Signed>>;
}
// NOLINTEND(*)
#include "wide_integer_impl.h"

View File

@ -15,6 +15,8 @@
#include <boost/multiprecision/cpp_bin_float.hpp>
#include <boost/math/special_functions/fpclassify.hpp>
// NOLINTBEGIN(*)
/// Use same extended double for all platforms
#if (LDBL_MANT_DIG == 64)
#define CONSTEXPR_FROM_DOUBLE constexpr
@ -1478,3 +1480,5 @@ struct hash<wide::integer<Bits, Signed>>
};
}
// NOLINTEND(*)

View File

@ -90,6 +90,7 @@
#define PCG_EMULATED_128BIT_MATH 1
#endif
// NOLINTBEGIN(*)
namespace pcg_extras {
@ -552,4 +553,6 @@ std::ostream& operator<<(std::ostream& out, printable_typename<T>) {
} // namespace pcg_extras
// NOLINTEND(*)
#endif // PCG_EXTRAS_HPP_INCLUDED

View File

@ -113,6 +113,8 @@
#include "pcg_extras.hpp"
// NOLINTBEGIN(*)
namespace DB
{
struct PcgSerializer;
@ -1777,4 +1779,6 @@ typedef pcg_engines::ext_oneseq_xsh_rs_64_32<14,32,true> pcg32_k16384_fast;
#pragma warning(default:4146)
#endif
// NOLINTEND(*)
#endif // PCG_RAND_HPP_INCLUDED

View File

@ -16,6 +16,8 @@
#include <cstddef>
#include <cstdint>
// NOLINTBEGIN(*)
/* Special width values */
enum {
widechar_nonprint = -1, // The character is not printable.
@ -518,4 +520,6 @@ inline int widechar_wcwidth(wchar_t c) {
return 1;
}
// NOLINTEND(*)
#endif // WIDECHAR_WIDTH_H

View File

@ -113,4 +113,8 @@ if (TARGET ch_contrib::krb5)
target_compile_definitions(_gsasl PRIVATE HAVE_GSSAPI_H=1 USE_GSSAPI=1)
endif()
if (TARGET OpenSSL::SSL)
target_link_libraries(_gsasl PRIVATE OpenSSL::Crypto OpenSSL::SSL)
endif()
add_library(ch_contrib::gsasl ALIAS _gsasl)

2
contrib/poco vendored

@ -1 +1 @@
Subproject commit de35b9fd72b57127abdc3a5beaf0e320d767e356
Subproject commit 0e32cb42db76ddaa76848470219056908053b676

View File

@ -11,6 +11,7 @@ add_library (_poco_xml_expat ${SRCS_EXPAT})
add_library (Poco::XML::Expat ALIAS _poco_xml_expat)
target_include_directories (_poco_xml_expat PUBLIC "${LIBRARY_DIR}/XML/include")
target_include_directories (_poco_xml_expat PRIVATE "${LIBRARY_DIR}/Foundation/include")
# Poco::XML

View File

@ -3,6 +3,7 @@ FROM ubuntu:20.04
# ARG for quick switch to a given ubuntu mirror
ARG apt_archive="http://archive.ubuntu.com"
RUN sed -i "s|http://archive.ubuntu.com|$apt_archive|g" /etc/apt/sources.list
RUN apt-get update \

View File

@ -30,8 +30,8 @@ set -e
# cleanup for retry run if volume is not recreated
# shellcheck disable=SC2046
{
docker kill $(docker ps -aq) || true
docker rm $(docker ps -aq) || true
docker ps -aq | xargs -r docker kill || true
docker ps -aq | xargs -r docker rm || true
}
echo "Start tests"

View File

@ -215,7 +215,7 @@ start
clickhouse-client --query "SELECT 'Server successfully started', 'OK'" >> /test_output/test_results.tsv \
|| (echo -e 'Server failed to start (see application_errors.txt and clickhouse-server.clean.log)\tFAIL' >> /test_output/test_results.tsv \
&& grep -Fa "<Error>.*Application" /var/log/clickhouse-server/clickhouse-server.log > /test_output/application_errors.txt)
&& grep -a "<Error>.*Application" /var/log/clickhouse-server/clickhouse-server.log > /test_output/application_errors.txt)
[ -f /var/log/clickhouse-server/clickhouse-server.log ] || echo -e "Server log does not exist\tFAIL"
[ -f /var/log/clickhouse-server/stderr.log ] || echo -e "Stderr log does not exist\tFAIL"
@ -313,7 +313,7 @@ then
start 500
clickhouse-client --query "SELECT 'Backward compatibility check: Server successfully started', 'OK'" >> /test_output/test_results.tsv \
|| (echo -e 'Backward compatibility check: Server failed to start\tFAIL' >> /test_output/test_results.tsv \
&& grep -Fa "<Error>.*Application" /var/log/clickhouse-server/clickhouse-server.log >> /test_output/bc_check_application_errors.txt)
&& grep -a "<Error>.*Application" /var/log/clickhouse-server/clickhouse-server.log >> /test_output/bc_check_application_errors.txt)
clickhouse-client --query="SELECT 'Server version: ', version()"
@ -343,7 +343,7 @@ then
-e "UNFINISHED" \
-e "Renaming unexpected part" \
-e "PART_IS_TEMPORARILY_LOCKED" \
-e "and a merge is impossible: we didn't find smaller parts" \
-e "and a merge is impossible: we didn't find" \
-e "found in queue and some source parts for it was lost" \
-e "is lost forever." \
/var/log/clickhouse-server/clickhouse-server.backward.clean.log | zgrep -Fa "<Error>" > /test_output/bc_check_error_messages.txt \

View File

@ -86,7 +86,7 @@ def process_test_log(log_path):
test_end = True
test_results = [
(test[0], test[1], test[2], "".join(test[3])) for test in test_results
(test[0], test[1], test[2], "".join(test[3]))[:4096] for test in test_results
]
return (

View File

@ -223,7 +223,7 @@ Replication in ClickHouse can be configured on a per-table basis. You could have
Replication is implemented in the `ReplicatedMergeTree` storage engine. The path in `ZooKeeper` is specified as a parameter for the storage engine. All tables with the same path in `ZooKeeper` become replicas of each other: they synchronize their data and maintain consistency. Replicas can be added and removed dynamically simply by creating or dropping a table.
Replication uses an asynchronous multi-master scheme. You can insert data into any replica that has a session with `ZooKeeper`, and data is replicated to all other replicas asynchronously. Because ClickHouse does not support UPDATEs, replication is conflict-free. As there is no quorum acknowledgment of inserts, just-inserted data might be lost if one node fails.
Replication uses an asynchronous multi-master scheme. You can insert data into any replica that has a session with `ZooKeeper`, and data is replicated to all other replicas asynchronously. Because ClickHouse does not support UPDATEs, replication is conflict-free. As there is no quorum acknowledgment of inserts by default, just-inserted data might be lost if one node fails. The insert quorum can be enabled using `insert_quorum` setting.
Metadata for replication is stored in ZooKeeper. There is a replication log that lists what actions to do. Actions are: get part; merge parts; drop a partition, and so on. Each replica copies the replication log to its queue and then executes the actions from the queue. For example, on insertion, the “get the part” action is created in the log, and every replica downloads that part. Merges are coordinated between replicas to get byte-identical results. All parts are merged in the same way on all replicas. One of the leaders initiates a new merge first and writes “merge parts” actions to the log. Multiple replicas (or all) can be leaders at the same time. A replica can be prevented from becoming a leader using the `merge_tree` setting `replicated_can_become_leader`. The leaders are responsible for scheduling background merges.

View File

@ -3,6 +3,74 @@ sidebar_label: Geo
sidebar_position: 62
---
# Geo Functions
# Geo Functions
## Geographical Coordinates Functions
- [greatCircleDistance](./coordinates.md#greatCircleDistance)
- [geoDistance](./coordinates.md#geoDistance)
- [greatCircleAngle](./coordinates.md#greatCircleAngle)
- [pointInEllipses](./coordinates.md#pointInEllipses)
- [pointInPolygon](./coordinates.md#pointInPolygon)
## Geohash Functions
- [geohashEncode](./geohash.md#geohashEncode)
- [geohashDecode](./geohash.md#geohashDecode)
- [geohashesInBox](./geohash.md#geohashesInBox)
## H3 Indexes Functions
- [h3IsValid](./h3#h3IsValid)
- [h3GetResolution](./h3#h3GetResolution)
- [h3EdgeAngle](./h3#h3EdgeAngle)
- [h3EdgeLengthM](./h3#h3EdgeLengthM)
- [h3EdgeLengthKm] (./h3#h3EdgeLengthKm)
- [geoToH3](./h3#geoToH3)
- [h3ToGeo](./h3#h3ToGeo)
- [h3ToGeoBoundary](./h3#h3ToGeoBoundary)
- [h3kRing](./h3#h3kRing)
- [h3GetBaseCell](./h3#h3GetBaseCell)
- [h3HexAreaM2](./h3#h3HexAreaM2)
- [h3HexAreaKm2](./h3#h3HexAreaKm2)
- [h3IndexesAreNeighbors](./h3#h3IndexesAreNeighbors)
- [h3ToChildren](./h3#h3ToChildren)
- [h3ToParent](./h3#h3ToParent)
- [h3ToString](./h3#h3ToString)
- [stringToH3](./h3#stringToH3)
- [h3GetResolution](./h3#h3GetResolution)
- [h3IsResClassIII](./h3#h3IsResClassIII)
- [h3IsPentagon](./h3#h3IsPentagon)
- [h3GetFaces](./h3#h3GetFaces)
- [h3CellAreaM2](./h3#h3CellAreaM2)
- [h3CellAreaRads2](./h3#h3CellAreaRads2)
- [h3ToCenterChild](./h3#h3ToCenterChild)
- [h3ExactEdgeLengthM](./h3#h3ExactEdgeLengthM)
- [h3ExactEdgeLengthKm](./h3#h3ExactEdgeLengthKm)
- [h3ExactEdgeLengthRads](./h3#h3ExactEdgeLengthRads)
- [h3NumHexagons](./h3#h3NumHexagons)
- [h3Line](./h3#h3Line)
- [h3Distance](./h3#h3Distance)
- [h3HexRing](./h3#h3HexRing)
- [h3GetUnidirectionalEdge](./h3#h3GetUnidirectionalEdge)
- [h3UnidirectionalEdgeIsValid](./h3#h3UnidirectionalEdgeIsValid)
- [h3GetOriginIndexFromUnidirectionalEdge](./h3#h3GetOriginIndexFromUnidirectionalEdge)
- [h3GetDestinationIndexFromUnidirectionalEdge](./h3#h3GetDestinationIndexFromUnidirectionalEdge)
- [h3GetIndexesFromUnidirectionalEdge](./h3#h3GetIndexesFromUnidirectionalEdge)
- [h3GetUnidirectionalEdgesFromHexagon](./h3#h3GetUnidirectionalEdgesFromHexagon)
- [h3GetUnidirectionalEdgeBoundary](./h3#h3GetUnidirectionalEdgeBoundary)
## S2 Index Functions
- [geoToS2](./s2#geoToS2)
- [s2ToGeo](./s2#s2ToGeo)
- [s2GetNeighbors](./s2#s2GetNeighbors)
- [s2CellsIntersect](./s2#s2CellsIntersect)
- [s2CapContains](./s2#s2CapContains)
- [s2CapUnion](./s2#s2CapUnion)
- [s2RectAdd](./s2#s2RectAdd)
- [s2RectContains](./s2#s2RectContains)
- [s2RectUinion](./s2#s2RectUinion)
- [s2RectIntersection](./s2#s2RectIntersection)
[Original article](https://clickhouse.com/docs/en/sql-reference/functions/geo/) <!--hide-->

View File

@ -223,7 +223,7 @@ void kerberosInit(const String & keytab_file, const String & principal, const St
{
// Using mutex to prevent cache file corruptions
static std::mutex kinit_mtx;
std::unique_lock<std::mutex> lck(kinit_mtx);
std::lock_guard lck(kinit_mtx);
KerberosInit k_init;
k_init.init(keytab_file, principal, cache_name);
}

View File

@ -1,6 +1,6 @@
#pragma once
#include <string.h>
#include <cstring>
#ifdef NDEBUG
#define ALLOCATOR_ASLR 0

View File

@ -1,6 +1,6 @@
#pragma once
#include <string.h>
#include <cstring>
#include <memory>
#include <vector>
#include <boost/noncopyable.hpp>

View File

@ -1,6 +1,6 @@
#pragma once
#include <stddef.h>
#include <cstddef>
#include <cstdint>
#include <utility>
#include <atomic>

View File

@ -130,7 +130,7 @@ public:
int getLineNumber() const { return line_number; }
void setLineNumber(int line_number_) { line_number = line_number_;}
const String getFileName() const { return file_name; }
String getFileName() const { return file_name; }
void setFileName(const String & file_name_) { file_name = file_name_; }
Exception * clone() const override { return new ParsingException(*this); }

View File

@ -2,7 +2,7 @@
#include <Common/FileCache_fwd.h>
namespace Poco { namespace Util { class AbstractConfiguration; } }
namespace Poco { namespace Util { class AbstractConfiguration; } } // NOLINT(cppcoreguidelines-virtual-class-destructor)
namespace DB
{

View File

@ -27,9 +27,9 @@ protected:
String getAliasToOrName(const String & name) const
{
if (aliases.count(name))
if (aliases.contains(name))
return aliases.at(name);
else if (String name_lowercase = Poco::toLower(name); case_insensitive_aliases.count(name_lowercase))
else if (String name_lowercase = Poco::toLower(name); case_insensitive_aliases.contains(name_lowercase))
return case_insensitive_aliases.at(name_lowercase);
else
return name;
@ -108,7 +108,7 @@ public:
bool isAlias(const String & name) const
{
return aliases.count(name) || case_insensitive_aliases.count(name);
return aliases.count(name) || case_insensitive_aliases.contains(name);
}
bool hasNameOrAlias(const String & name) const
@ -125,7 +125,7 @@ public:
return name;
}
virtual ~IFactoryWithAliases() override = default;
~IFactoryWithAliases() override = default;
private:
using InnerMap = std::unordered_map<String, Value>; // name -> creator

View File

@ -1,6 +1,6 @@
#pragma once
#include <string.h>
#include <cstring>
#include <string>
#include <exception>
#include <Common/DateLUT.h>

View File

@ -137,7 +137,7 @@ void OvercommitTracker::onQueryStop(MemoryTracker * tracker)
{
DENY_ALLOCATIONS_IN_SCOPE;
std::unique_lock<std::mutex> lk(overcommit_m);
std::lock_guard lk(overcommit_m);
if (picked_tracker == tracker)
{
LOG_DEBUG_SAFE(getLogger(), "Picked query stopped");

View File

@ -1,6 +1,6 @@
#pragma once
#include <string.h>
#include <cstring>
#include <cstddef>
#include <cassert>
#include <algorithm>

View File

@ -55,7 +55,7 @@ private:
explicit PoolEntryHelper(PooledObject & data_) : data(data_) { data.in_use = true; }
~PoolEntryHelper()
{
std::unique_lock lock(data.pool.mutex);
std::lock_guard lock(data.pool.mutex);
data.in_use = false;
data.pool.available.notify_one();
}
@ -163,7 +163,7 @@ public:
inline size_t size()
{
std::unique_lock lock(mutex);
std::lock_guard lock(mutex);
return items.size();
}

View File

@ -4,7 +4,7 @@
#include "base/types.h"
#include <atomic>
#include <memory>
#include <stddef.h>
#include <cstddef>
/** Implements global counters for various events happening in the application
* - for high level profiling.

View File

@ -7,7 +7,7 @@
#include <array>
#include <optional>
#include <functional>
#include <signal.h>
#include <csignal>
#ifdef OS_DARWIN
// ucontext is not available without _XOPEN_SOURCE

View File

@ -7,8 +7,8 @@
#include <Core/Defines.h>
#include <base/range.h>
#include <Poco/Unicode.h>
#include <stdint.h>
#include <string.h>
#include <cstdint>
#include <cstring>
#ifdef __SSE2__
#include <emmintrin.h>

View File

@ -139,7 +139,7 @@ void SystemLogBase<LogElement>::flush(bool force)
uint64_t this_thread_requested_offset;
{
std::unique_lock lock(mutex);
std::lock_guard lock(mutex);
if (is_shutdown)
return;

View File

@ -209,7 +209,7 @@ template <typename Thread>
void ThreadPoolImpl<Thread>::finalize()
{
{
std::unique_lock lock(mutex);
std::lock_guard lock(mutex);
shutdown = true;
}
@ -224,14 +224,14 @@ void ThreadPoolImpl<Thread>::finalize()
template <typename Thread>
size_t ThreadPoolImpl<Thread>::active() const
{
std::unique_lock lock(mutex);
std::lock_guard lock(mutex);
return scheduled_jobs;
}
template <typename Thread>
bool ThreadPoolImpl<Thread>::finished() const
{
std::unique_lock lock(mutex);
std::lock_guard lock(mutex);
return shutdown;
}
@ -290,7 +290,7 @@ void ThreadPoolImpl<Thread>::worker(typename std::list<Thread>::iterator thread_
job = {};
{
std::unique_lock lock(mutex);
std::lock_guard lock(mutex);
if (!first_exception)
first_exception = std::current_exception(); // NOLINT
if (shutdown_on_exception)
@ -305,7 +305,7 @@ void ThreadPoolImpl<Thread>::worker(typename std::list<Thread>::iterator thread_
}
{
std::unique_lock lock(mutex);
std::lock_guard lock(mutex);
--scheduled_jobs;
if (threads.size() > scheduled_jobs + max_free_threads)

View File

@ -1,7 +1,7 @@
#pragma once
#include <base/types.h>
#include <string.h>
#include <cstring>
#include <algorithm>
#include <utility>
#include <base/range.h>

View File

@ -1,6 +1,6 @@
#pragma once
#include <string.h>
#include <cstring>
#ifdef __SSE2__
# include <emmintrin.h>

View File

@ -576,7 +576,7 @@ nuraft::cb_func::ReturnCode KeeperServer::callbackFunc(nuraft::cb_func::Type typ
auto set_initialized = [this]()
{
std::unique_lock lock(initialized_mutex);
std::lock_guard lock(initialized_mutex);
initialized_flag = true;
initialized_cv.notify_all();
};

View File

@ -54,7 +54,7 @@ DEFINE_FIELD_VECTOR(Map); /// TODO: use map instead of vector.
#undef DEFINE_FIELD_VECTOR
using FieldMap = std::map<String, Field, std::less<String>, AllocatorWithMemoryTracking<std::pair<const String, Field>>>;
using FieldMap = std::map<String, Field, std::less<>, AllocatorWithMemoryTracking<std::pair<const String, Field>>>;
#define DEFINE_FIELD_MAP(X) \
struct X : public FieldMap \

View File

@ -32,7 +32,7 @@ struct ConnectionInfo
class Connection : private boost::noncopyable
{
public:
Connection(const ConnectionInfo & connection_info_, bool replication_ = false, size_t num_tries = 3);
explicit Connection(const ConnectionInfo & connection_info_, bool replication_ = false, size_t num_tries = 3);
void execWithRetry(const std::function<void(pqxx::nontransaction &)> & exec);

View File

@ -25,13 +25,13 @@ public:
static constexpr inline auto POSTGRESQL_POOL_WAIT_TIMEOUT = 5000;
static constexpr inline auto POSTGRESQL_POOL_WITH_FAILOVER_DEFAULT_MAX_TRIES = 5;
PoolWithFailover(
explicit PoolWithFailover(
const DB::ExternalDataSourcesConfigurationByPriority & configurations_by_priority,
size_t pool_size = POSTGRESQL_POOL_DEFAULT_SIZE,
size_t pool_wait_timeout = POSTGRESQL_POOL_WAIT_TIMEOUT,
size_t max_tries_ = POSTGRESQL_POOL_WITH_FAILOVER_DEFAULT_MAX_TRIES);
PoolWithFailover(
explicit PoolWithFailover(
const DB::StoragePostgreSQLConfiguration & configuration,
size_t pool_size = POSTGRESQL_POOL_DEFAULT_SIZE,
size_t pool_wait_timeout = POSTGRESQL_POOL_WAIT_TIMEOUT,

View File

@ -591,6 +591,8 @@ static constexpr UInt64 operator""_GiB(unsigned long long value)
M(Bool, use_local_cache_for_remote_storage, true, "Use local cache for remote storage like HDFS or S3, it's used for remote table engine only", 0) \
\
M(Bool, allow_unrestricted_reads_from_keeper, false, "Allow unrestricted (without condition on path) reads from system.zookeeper table, can be handy, but is not safe for zookeeper", 0) \
M(Bool, allow_deprecated_database_ordinary, false, "Allow to create databases with deprecated Ordinary engine", 0) \
M(Bool, allow_deprecated_syntax_for_merge_tree, false, "Allow to create *MergeTree tables with deprecated engine definition syntax", 0) \
\
/** Experimental functions */ \
M(Bool, allow_experimental_funnel_functions, false, "Enable experimental functions for funnel analysis.", 0) \

View File

@ -925,7 +925,7 @@ void BaseDaemon::handleSignal(int signal_id)
signal_id == SIGQUIT ||
signal_id == SIGTERM)
{
std::unique_lock<std::mutex> lock(signal_handler_mutex);
std::lock_guard lock(signal_handler_mutex);
{
++terminate_signals_counter;
sigint_signals_counter += signal_id == SIGINT;

View File

@ -39,7 +39,7 @@ DatabaseMaterializedMySQL::DatabaseMaterializedMySQL(
void DatabaseMaterializedMySQL::rethrowExceptionIfNeeded() const
{
std::unique_lock<std::mutex> lock(mutex);
std::lock_guard lock(mutex);
if (!settings->allows_query_when_mysql_lost && exception)
{
@ -59,7 +59,7 @@ void DatabaseMaterializedMySQL::rethrowExceptionIfNeeded() const
void DatabaseMaterializedMySQL::setException(const std::exception_ptr & exception_)
{
std::unique_lock<std::mutex> lock(mutex);
std::lock_guard lock(mutex);
exception = exception_;
}

View File

@ -12,7 +12,7 @@
namespace Poco
{
class Logger;
class Logger; // NOLINT(cppcoreguidelines-virtual-class-destructor)
}
class AtomicStopwatch;

View File

@ -135,14 +135,14 @@ void CacheDictionaryUpdateQueue<dictionary_key_type>::updateThreadFunction()
/// Notify thread about finished updating the bunch of ids
/// where their own ids were included.
std::unique_lock<std::mutex> lock(update_mutex);
std::lock_guard lock(update_mutex);
unit_to_update->is_done = true;
is_update_finished.notify_all();
}
catch (...)
{
std::unique_lock<std::mutex> lock(update_mutex);
std::lock_guard lock(update_mutex);
unit_to_update->current_exception = std::current_exception(); // NOLINT(bugprone-throw-keyword-missing)
is_update_finished.notify_all();

View File

@ -8,11 +8,6 @@
namespace Poco
{
namespace Util
{
class AbstractConfiguration;
}
namespace Redis
{
class Client;

View File

@ -90,7 +90,7 @@ DiskCacheWrapper::DiskCacheWrapper(
std::shared_ptr<FileDownloadMetadata> DiskCacheWrapper::acquireDownloadMetadata(const String & path) const
{
std::unique_lock<std::mutex> lock{mutex};
std::lock_guard lock{mutex};
auto it = file_downloads.find(path);
if (it != file_downloads.end())
@ -101,7 +101,7 @@ std::shared_ptr<FileDownloadMetadata> DiskCacheWrapper::acquireDownloadMetadata(
new FileDownloadMetadata,
[this, path] (FileDownloadMetadata * p)
{
std::unique_lock<std::mutex> erase_lock{mutex};
std::lock_guard erase_lock{mutex};
file_downloads.erase(path);
delete p;
});

View File

@ -113,7 +113,7 @@ std::unique_ptr<WriteBufferFromFileBase> AzureObjectStorage::writeObject( /// NO
return std::make_unique<WriteIndirectBufferFromRemoteFS>(std::move(buffer), std::move(finalize_callback), path);
}
void AzureObjectStorage::listPrefix(const std::string & path, PathsWithSize & children) const
void AzureObjectStorage::listPrefix(const std::string & path, RelativePathsWithSize & children) const
{
auto client_ptr = client.get();

View File

@ -73,7 +73,7 @@ public:
size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE,
const WriteSettings & write_settings = {}) override;
void listPrefix(const std::string & path, PathsWithSize & children) const override;
void listPrefix(const std::string & path, RelativePathsWithSize & children) const override;
/// Remove file. Throws exception if file doesn't exists or it's a directory.
void removeObject(const std::string & path) override;

View File

@ -51,7 +51,7 @@ void DiskObjectStorageMetadata::deserialize(ReadBuffer & buf)
remote_fs_object_path = remote_fs_object_path.substr(remote_fs_root_path.size());
}
assertChar('\n', buf);
storage_objects[i].relative_path = remote_fs_object_path;
storage_objects[i].path = remote_fs_object_path;
storage_objects[i].bytes_size = remote_fs_object_size;
}

View File

@ -12,17 +12,6 @@ namespace DB
struct DiskObjectStorageMetadata
{
private:
struct RelativePathWithSize
{
String relative_path;
size_t bytes_size;
RelativePathWithSize() = default;
RelativePathWithSize(const String & relative_path_, size_t bytes_size_)
: relative_path(relative_path_), bytes_size(bytes_size_) {}
};
/// Metadata file version.
static constexpr uint32_t VERSION_ABSOLUTE_PATHS = 1;
static constexpr uint32_t VERSION_RELATIVE_PATHS = 2;
@ -31,7 +20,7 @@ private:
const std::string & common_metadata_path;
/// Relative paths of blobs.
std::vector<RelativePathWithSize> storage_objects;
RelativePathsWithSize storage_objects;
/// URI
const std::string & remote_fs_root_path;
@ -71,7 +60,7 @@ public:
return remote_fs_root_path;
}
std::vector<RelativePathWithSize> getBlobsRelativePaths() const
RelativePathsWithSize getBlobsRelativePaths() const
{
return storage_objects;
}

View File

@ -1,5 +1,6 @@
#include <Disks/ObjectStorages/DiskObjectStorageRemoteMetadataRestoreHelper.h>
#include <Disks/ObjectStorages/DiskObjectStorage.h>
#include <Disks/ObjectStorages/DiskObjectStorageMetadata.h>
#include <IO/WriteHelpers.h>
#include <IO/ReadHelpers.h>
#include <IO/ReadBufferFromFile.h>
@ -379,7 +380,7 @@ void DiskObjectStorageRemoteMetadataRestoreHelper::restoreFiles(IObjectStorage *
return true;
};
PathsWithSize children;
RelativePathsWithSize children;
source_object_storage->listPrefix(restore_information.source_path, children);
restore_files(children);
@ -523,7 +524,7 @@ void DiskObjectStorageRemoteMetadataRestoreHelper::restoreFileOperations(IObject
return true;
};
PathsWithSize children;
RelativePathsWithSize children;
source_object_storage->listPrefix(restore_information.source_path + "operations/", children);
restore_file_operations(children);

View File

@ -81,7 +81,7 @@ std::unique_ptr<WriteBufferFromFileBase> HDFSObjectStorage::writeObject( /// NOL
}
void HDFSObjectStorage::listPrefix(const std::string & path, PathsWithSize & children) const
void HDFSObjectStorage::listPrefix(const std::string & path, RelativePathsWithSize & children) const
{
const size_t begin_of_path = path.find('/', path.find("//") + 2);
int32_t num_entries;

View File

@ -75,7 +75,7 @@ public:
size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE,
const WriteSettings & write_settings = {}) override;
void listPrefix(const std::string & path, PathsWithSize & children) const override;
void listPrefix(const std::string & path, RelativePathsWithSize & children) const override;
/// Remove file. Throws exception if file doesn't exists or it's a directory.
void removeObject(const std::string & path) override;

View File

@ -41,6 +41,7 @@ struct PathWithSize
/// List of paths with their sizes
using PathsWithSize = std::vector<PathWithSize>;
using RelativePathsWithSize = PathsWithSize;
struct ObjectMetadata
{
@ -65,7 +66,7 @@ public:
virtual bool exists(const std::string & path) const = 0;
/// List on prefix, return children (relative paths) with their sizes.
virtual void listPrefix(const std::string & path, PathsWithSize & children) const = 0;
virtual void listPrefix(const std::string & path, RelativePathsWithSize & children) const = 0;
/// Get object metadata if supported. It should be possible to receive
/// at least size of object

View File

@ -56,7 +56,7 @@ void MetadataStorageFromDiskTransaction::commit()
toString(state), toString(MetadataFromDiskTransactionState::PREPARING));
{
std::unique_lock lock(metadata_storage.metadata_mutex);
std::lock_guard lock(metadata_storage.metadata_mutex);
for (size_t i = 0; i < operations.size(); ++i)
{
try

View File

@ -28,7 +28,7 @@ ClientConfigurationPerRequest ProxyResolverConfiguration::getConfiguration(const
{
LOG_DEBUG(&Poco::Logger::get("AWSClient"), "Obtain proxy using resolver: {}", endpoint.toString());
std::unique_lock lock(cache_mutex);
std::lock_guard lock(cache_mutex);
std::chrono::time_point<std::chrono::system_clock> now = std::chrono::system_clock::now();
@ -110,7 +110,7 @@ void ProxyResolverConfiguration::errorReport(const ClientConfigurationPerRequest
if (config.proxy_host.empty())
return;
std::unique_lock lock(cache_mutex);
std::lock_guard lock(cache_mutex);
if (!cache_ttl.count() || !cache_valid)
return;

View File

@ -195,7 +195,7 @@ std::unique_ptr<WriteBufferFromFileBase> S3ObjectStorage::writeObject( /// NOLIN
return std::make_unique<WriteIndirectBufferFromRemoteFS>(std::move(s3_buffer), std::move(finalize_callback), path);
}
void S3ObjectStorage::listPrefix(const std::string & path, PathsWithSize & children) const
void S3ObjectStorage::listPrefix(const std::string & path, RelativePathsWithSize & children) const
{
auto settings_ptr = s3_settings.get();
auto client_ptr = client.get();

View File

@ -80,7 +80,7 @@ public:
size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE,
const WriteSettings & write_settings = {}) override;
void listPrefix(const std::string & path, PathsWithSize & children) const override;
void listPrefix(const std::string & path, RelativePathsWithSize & children) const override;
/// Remove file. Throws exception if file doesn't exist or it's a directory.
void removeObject(const std::string & path) override;

View File

@ -36,7 +36,7 @@ struct FormatSettings
bool seekable_read = true;
UInt64 max_rows_to_read_for_schema_inference = 100;
String column_names_for_schema_inference = "";
String column_names_for_schema_inference;
enum class DateTimeInputFormat
{

View File

@ -205,7 +205,7 @@ struct ConvertImpl
if constexpr (std::is_same_v<FromDataType, DataTypeUUID> != std::is_same_v<ToDataType, DataTypeUUID>)
{
throw Exception("Conversion between numeric types and UUID is not supported", ErrorCodes::NOT_IMPLEMENTED);
throw Exception("Conversion between numeric types and UUID is not supported. Probably the passed UUID is unquoted", ErrorCodes::NOT_IMPLEMENTED);
}
else
{

View File

@ -181,7 +181,7 @@ public:
}
private:
void nextImpl() override final
void nextImpl() final
{
const size_t prev_size = Base::position() - memory.data();
memory.resize(2 * prev_size + 1);

View File

@ -212,7 +212,7 @@ namespace
size_t max_connections_per_endpoint,
bool resolve_host = true)
{
std::unique_lock lock(mutex);
std::lock_guard lock(mutex);
const std::string & host = uri.getHost();
UInt16 port = uri.getPort();
bool https = isHTTPS(uri);

View File

@ -148,7 +148,7 @@ public:
{
String credentials_string;
{
std::unique_lock<std::recursive_mutex> locker(token_mutex);
std::lock_guard locker(token_mutex);
LOG_TRACE(logger, "Getting default credentials for EC2 instance.");
auto result = GetResourceWithAWSWebServiceResult(endpoint.c_str(), EC2_SECURITY_CREDENTIALS_RESOURCE, nullptr);
@ -194,7 +194,7 @@ public:
String new_token;
{
std::unique_lock<std::recursive_mutex> locker(token_mutex);
std::lock_guard locker(token_mutex);
Aws::StringStream ss;
ss << endpoint << EC2_IMDS_TOKEN_RESOURCE;

View File

@ -1,11 +1,10 @@
#pragma once
#include <algorithm>
#include <cstring>
#include <memory>
#include <iostream>
#include <cassert>
#include <string.h>
#include <cstring>
#include <Common/Exception.h>
#include <Common/LockMemoryExceptionInThread.h>

View File

@ -64,7 +64,7 @@ public:
}
private:
void finalizeImpl() override final
void finalizeImpl() override
{
vector.resize(
((position() - reinterpret_cast<Position>(vector.data())) /// NOLINT

View File

@ -215,7 +215,7 @@ void AsynchronousInsertQueue::push(ASTPtr query, ContextPtr query_context)
}
}
std::unique_lock write_lock(rwlock);
std::lock_guard write_lock(rwlock);
auto it = queue.emplace(key, std::make_shared<Container>()).first;
pushImpl(std::move(entry), it);
}
@ -343,7 +343,7 @@ void AsynchronousInsertQueue::cleanup()
if (!keys_to_remove.empty())
{
std::unique_lock write_lock(rwlock);
std::lock_guard write_lock(rwlock);
size_t total_removed = 0;
for (const auto & key : keys_to_remove)

View File

@ -698,7 +698,7 @@ std::unique_lock<std::shared_mutex> DatabaseCatalog::getExclusiveDDLGuardForData
{
DDLGuards::iterator db_guard_iter;
{
std::unique_lock lock(ddl_guards_mutex);
std::lock_guard lock(ddl_guards_mutex);
db_guard_iter = ddl_guards.try_emplace(database).first;
assert(db_guard_iter->second.first.contains(""));
}

View File

@ -60,7 +60,7 @@ bool EmbeddedDictionaries::reloadDictionary(
bool EmbeddedDictionaries::reloadImpl(const bool throw_on_error, const bool force_reload)
{
std::unique_lock lock(mutex);
std::lock_guard lock(mutex);
/** If you can not update the directories, then despite this, do not throw an exception (use the old directories).
* If there are no old correct directories, then when using functions that depend on them,

View File

@ -0,0 +1,77 @@
#include <Interpreters/InterpreterCreateIndexQuery.h>
#include <Access/ContextAccess.h>
#include <Databases/DatabaseReplicated.h>
#include <Interpreters/Context.h>
#include <Interpreters/executeDDLQueryOnCluster.h>
#include <Parsers/ASTCreateIndexQuery.h>
#include <Parsers/ASTIdentifier.h>
#include <Parsers/ASTIndexDeclaration.h>
#include <Storages/AlterCommands.h>
namespace DB
{
namespace ErrorCodes
{
extern const int TABLE_IS_READ_ONLY;
}
BlockIO InterpreterCreateIndexQuery::execute()
{
const auto & create_index = query_ptr->as<ASTCreateIndexQuery &>();
AccessRightsElements required_access;
required_access.emplace_back(AccessType::ALTER_ADD_INDEX, create_index.getDatabase(), create_index.getTable());
if (!create_index.cluster.empty())
{
DDLQueryOnClusterParams params;
params.access_to_check = std::move(required_access);
return executeDDLQueryOnCluster(query_ptr, getContext(), params);
}
getContext()->checkAccess(required_access);
auto table_id = getContext()->resolveStorageID(create_index, Context::ResolveOrdinary);
query_ptr->as<ASTCreateIndexQuery &>().setDatabase(table_id.database_name);
DatabasePtr database = DatabaseCatalog::instance().getDatabase(table_id.database_name);
if (typeid_cast<DatabaseReplicated *>(database.get())
&& !getContext()->getClientInfo().is_replicated_database_internal)
{
auto guard = DatabaseCatalog::instance().getDDLGuard(table_id.database_name, table_id.table_name);
guard->releaseTableLock();
return typeid_cast<DatabaseReplicated *>(database.get())->tryEnqueueReplicatedDDL(query_ptr, getContext());
}
StoragePtr table = DatabaseCatalog::instance().getTable(table_id, getContext());
if (table->isStaticStorage())
throw Exception(ErrorCodes::TABLE_IS_READ_ONLY, "Table is read-only");
/// Convert ASTCreateIndexQuery to AlterCommand.
AlterCommands alter_commands;
AlterCommand command;
command.index_decl = create_index.index_decl;
command.type = AlterCommand::ADD_INDEX;
command.index_name = create_index.index_name->as<ASTIdentifier &>().name();
command.if_not_exists = create_index.if_not_exists;
/// Fill name in ASTIndexDeclaration
auto & ast_index_decl = command.index_decl->as<ASTIndexDeclaration &>();
ast_index_decl.name = command.index_name;
alter_commands.emplace_back(std::move(command));
auto alter_lock = table->lockForAlter(getContext()->getSettingsRef().lock_acquire_timeout);
StorageInMemoryMetadata metadata = table->getInMemoryMetadata();
alter_commands.validate(table, getContext());
alter_commands.prepare(metadata);
table->checkAlterIsPossible(alter_commands, getContext());
table->alter(alter_commands, getContext(), alter_lock);
return {};
}
}

View File

@ -0,0 +1,24 @@
#pragma once
#include <Interpreters/IInterpreter.h>
namespace DB
{
class Context;
class InterpreterCreateIndexQuery : public IInterpreter, WithContext
{
public:
InterpreterCreateIndexQuery(const ASTPtr & query_ptr_, ContextPtr context_)
: WithContext(context_)
, query_ptr(query_ptr_) {}
BlockIO execute() override;
private:
ASTPtr query_ptr;
};
}

View File

@ -0,0 +1,71 @@
#include <Access/ContextAccess.h>
#include <Databases/DatabaseReplicated.h>
#include <Interpreters/Context.h>
#include <Interpreters/executeDDLQueryOnCluster.h>
#include <Interpreters/InterpreterDropIndexQuery.h>
#include <Parsers/ASTDropIndexQuery.h>
#include <Parsers/ASTIdentifier.h>
#include <Storages/AlterCommands.h>
namespace DB
{
namespace ErrorCodes
{
extern const int TABLE_IS_READ_ONLY;
}
BlockIO InterpreterDropIndexQuery::execute()
{
const auto & drop_index = query_ptr->as<ASTDropIndexQuery &>();
AccessRightsElements required_access;
required_access.emplace_back(AccessType::ALTER_DROP_INDEX, drop_index.getDatabase(), drop_index.getTable());
if (!drop_index.cluster.empty())
{
DDLQueryOnClusterParams params;
params.access_to_check = std::move(required_access);
return executeDDLQueryOnCluster(query_ptr, getContext(), params);
}
getContext()->checkAccess(required_access);
auto table_id = getContext()->resolveStorageID(drop_index, Context::ResolveOrdinary);
query_ptr->as<ASTDropIndexQuery &>().setDatabase(table_id.database_name);
DatabasePtr database = DatabaseCatalog::instance().getDatabase(table_id.database_name);
if (typeid_cast<DatabaseReplicated *>(database.get())
&& !getContext()->getClientInfo().is_replicated_database_internal)
{
auto guard = DatabaseCatalog::instance().getDDLGuard(table_id.database_name, table_id.table_name);
guard->releaseTableLock();
return typeid_cast<DatabaseReplicated *>(database.get())->tryEnqueueReplicatedDDL(query_ptr, getContext());
}
StoragePtr table = DatabaseCatalog::instance().getTable(table_id, getContext());
if (table->isStaticStorage())
throw Exception(ErrorCodes::TABLE_IS_READ_ONLY, "Table is read-only");
/// Convert ASTDropIndexQuery to AlterCommand.
AlterCommands alter_commands;
AlterCommand command;
command.ast = drop_index.convertToASTAlterCommand();
command.type = AlterCommand::DROP_INDEX;
command.index_name = drop_index.index_name->as<ASTIdentifier &>().name();
command.if_exists = drop_index.if_exists;
alter_commands.emplace_back(std::move(command));
auto alter_lock = table->lockForAlter(getContext()->getSettingsRef().lock_acquire_timeout);
StorageInMemoryMetadata metadata = table->getInMemoryMetadata();
alter_commands.validate(table, getContext());
alter_commands.prepare(metadata);
table->checkAlterIsPossible(alter_commands, getContext());
table->alter(alter_commands, getContext(), alter_lock);
return {};
}
}

View File

@ -0,0 +1,24 @@
#pragma once
#include <Interpreters/IInterpreter.h>
namespace DB
{
class Context;
class InterpreterDropIndexQuery : public IInterpreter, WithContext
{
public:
InterpreterDropIndexQuery(const ASTPtr & query_ptr_, ContextPtr context_)
: WithContext(context_)
, query_ptr(query_ptr_) {}
BlockIO execute() override;
private:
ASTPtr query_ptr;
};
}

View File

@ -3,7 +3,9 @@
#include <Parsers/ASTCheckQuery.h>
#include <Parsers/ASTCreateQuery.h>
#include <Parsers/ASTCreateFunctionQuery.h>
#include <Parsers/ASTCreateIndexQuery.h>
#include <Parsers/ASTDropFunctionQuery.h>
#include <Parsers/ASTDropIndexQuery.h>
#include <Parsers/ASTDropQuery.h>
#include <Parsers/ASTExplainQuery.h>
#include <Parsers/ASTInsertQuery.h>
@ -42,10 +44,12 @@
#include <Interpreters/InterpreterBackupQuery.h>
#include <Interpreters/InterpreterCheckQuery.h>
#include <Interpreters/InterpreterCreateFunctionQuery.h>
#include <Interpreters/InterpreterCreateIndexQuery.h>
#include <Interpreters/InterpreterCreateQuery.h>
#include <Interpreters/InterpreterDescribeQuery.h>
#include <Interpreters/InterpreterDescribeCacheQuery.h>
#include <Interpreters/InterpreterDropFunctionQuery.h>
#include <Interpreters/InterpreterDropIndexQuery.h>
#include <Interpreters/InterpreterDropQuery.h>
#include <Interpreters/InterpreterExistsQuery.h>
#include <Interpreters/InterpreterExplainQuery.h>
@ -298,6 +302,14 @@ std::unique_ptr<IInterpreter> InterpreterFactory::get(ASTPtr & query, ContextMut
{
return std::make_unique<InterpreterDropFunctionQuery>(query, context);
}
else if (query->as<ASTCreateIndexQuery>())
{
return std::make_unique<InterpreterCreateIndexQuery>(query, context);
}
else if (query->as<ASTDropIndexQuery>())
{
return std::make_unique<InterpreterDropIndexQuery>(query, context);
}
else if (query->as<ASTBackupQuery>())
{
return std::make_unique<InterpreterBackupQuery>(query, context);

View File

@ -1590,7 +1590,8 @@ static void executeMergeAggregatedImpl(
bool has_grouping_sets,
const Settings & settings,
const NamesAndTypesList & aggregation_keys,
const AggregateDescriptions & aggregates)
const AggregateDescriptions & aggregates,
bool should_produce_results_in_order_of_bucket_number)
{
auto keys = aggregation_keys.getNames();
if (has_grouping_sets)
@ -1619,7 +1620,8 @@ static void executeMergeAggregatedImpl(
final,
settings.distributed_aggregation_memory_efficient && is_remote_storage,
settings.max_threads,
settings.aggregation_memory_efficient_merge_threads);
settings.aggregation_memory_efficient_merge_threads,
should_produce_results_in_order_of_bucket_number);
query_plan.addStep(std::move(merging_aggregated));
}
@ -1680,6 +1682,9 @@ void InterpreterSelectQuery::addEmptySourceToQueryPlan(
query_plan.addStep(std::move(expression_before_aggregation));
}
// Let's just choose the safe option since we don't know the value of `to_stage` here.
const bool should_produce_results_in_order_of_bucket_number = true;
executeMergeAggregatedImpl(
query_plan,
query_info.projection->aggregate_overflow_row,
@ -1688,7 +1693,8 @@ void InterpreterSelectQuery::addEmptySourceToQueryPlan(
false,
context_->getSettingsRef(),
query_info.projection->aggregation_keys,
query_info.projection->aggregate_descriptions);
query_info.projection->aggregate_descriptions,
should_produce_results_in_order_of_bucket_number);
}
}
}
@ -2268,6 +2274,9 @@ void InterpreterSelectQuery::executeAggregation(QueryPlan & query_plan, const Ac
bool storage_has_evenly_distributed_read = storage && storage->hasEvenlyDistributedRead();
const bool should_produce_results_in_order_of_bucket_number
= options.to_stage == QueryProcessingStage::WithMergeableState && settings.distributed_aggregation_memory_efficient;
auto aggregating_step = std::make_unique<AggregatingStep>(
query_plan.getCurrentDataStream(),
std::move(aggregator_params),
@ -2279,7 +2288,8 @@ void InterpreterSelectQuery::executeAggregation(QueryPlan & query_plan, const Ac
temporary_data_merge_threads,
storage_has_evenly_distributed_read,
std::move(group_by_info),
std::move(group_by_sort_description));
std::move(group_by_sort_description),
should_produce_results_in_order_of_bucket_number);
query_plan.addStep(std::move(aggregating_step));
}
@ -2292,6 +2302,9 @@ void InterpreterSelectQuery::executeMergeAggregated(QueryPlan & query_plan, bool
if (query_info.projection && query_info.projection->desc->type == ProjectionDescription::Type::Aggregate)
return;
const bool should_produce_results_in_order_of_bucket_number = options.to_stage == QueryProcessingStage::WithMergeableState
&& context->getSettingsRef().distributed_aggregation_memory_efficient;
executeMergeAggregatedImpl(
query_plan,
overflow_row,
@ -2300,7 +2313,8 @@ void InterpreterSelectQuery::executeMergeAggregated(QueryPlan & query_plan, bool
has_grouping_sets,
context->getSettingsRef(),
query_analyzer->aggregationKeys(),
query_analyzer->aggregates());
query_analyzer->aggregates(),
should_produce_results_in_order_of_bucket_number);
}

View File

@ -580,7 +580,7 @@ void MergeJoin::mergeRightBlocks()
void MergeJoin::mergeInMemoryRightBlocks()
{
std::unique_lock lock(rwlock);
std::lock_guard lock(rwlock);
if (right_blocks.empty())
return;
@ -613,7 +613,7 @@ void MergeJoin::mergeInMemoryRightBlocks()
void MergeJoin::mergeFlushedRightBlocks()
{
std::unique_lock lock(rwlock);
std::lock_guard lock(rwlock);
auto callback = [&](const Block & block)
{
@ -638,7 +638,7 @@ bool MergeJoin::saveRightBlock(Block && block)
{
if (is_in_memory)
{
std::unique_lock lock(rwlock);
std::lock_guard lock(rwlock);
if (!is_in_memory)
{

View File

@ -103,7 +103,7 @@ void NO_INLINE Set::insertFromBlockImplCase(
void Set::setHeader(const ColumnsWithTypeAndName & header)
{
std::unique_lock lock(rwlock);
std::lock_guard lock(rwlock);
if (!data.empty())
return;

View File

@ -10,7 +10,7 @@ namespace Poco
{
namespace Util
{
class AbstractConfiguration;
class AbstractConfiguration; // NOLINT(cppcoreguidelines-virtual-class-destructor)
}
}

View File

@ -1,3 +1,4 @@
#include <Parsers/ASTIndexDeclaration.h>
#include <iomanip>
#include <IO/Operators.h>
#include <Parsers/ASTAlterQuery.h>
@ -556,6 +557,7 @@ void ASTAlterQuery::formatQueryImpl(const FormatSettings & settings, FormatState
frame.need_parens = false;
std::string indent_str = settings.one_line ? "" : std::string(4u * frame.indent, ' ');
settings.ostr << (settings.hilite ? hilite_keyword : "") << indent_str;
switch (alter_object)

View File

@ -0,0 +1,61 @@
#include <Common/quoteString.h>
#include <IO/Operators.h>
#include <Parsers/ASTCreateIndexQuery.h>
#include <Parsers/ASTIndexDeclaration.h>
namespace DB
{
/** Get the text that identifies this element. */
String ASTCreateIndexQuery::getID(char delim) const
{
return "CreateIndexQuery" + (delim + getDatabase()) + delim + getTable();
}
ASTPtr ASTCreateIndexQuery::clone() const
{
auto res = std::make_shared<ASTCreateIndexQuery>(*this);
res->children.clear();
res->index_name = index_name->clone();
res->children.push_back(res->index_name);
res->index_decl = index_decl->clone();
res->children.push_back(res->index_decl);
return res;
}
void ASTCreateIndexQuery::formatQueryImpl(const FormatSettings & settings, FormatState & state, FormatStateStacked frame) const
{
frame.need_parens = false;
std::string indent_str = settings.one_line ? "" : std::string(4u * frame.indent, ' ');
settings.ostr << (settings.hilite ? hilite_keyword : "") << indent_str;
settings.ostr << "CREATE INDEX " << (if_not_exists ? "IF NOT EXISTS " : "");
index_name->formatImpl(settings, state, frame);
settings.ostr << " ON ";
settings.ostr << (settings.hilite ? hilite_none : "");
if (table)
{
if (database)
{
settings.ostr << indent_str << backQuoteIfNeed(getDatabase());
settings.ostr << ".";
}
settings.ostr << indent_str << backQuoteIfNeed(getTable());
}
formatOnCluster(settings);
if (!cluster.empty())
settings.ostr << " ";
index_decl->formatImpl(settings, state, frame);
}
}

View File

@ -0,0 +1,39 @@
#pragma once
#include <Parsers/ASTQueryWithOnCluster.h>
#include <Parsers/ASTQueryWithTableAndOutput.h>
#include <Parsers/IAST.h>
namespace DB
{
/** CREATE INDEX [IF NOT EXISTS] name ON [db].name (expression) TYPE type GRANULARITY value
*/
class ASTCreateIndexQuery : public ASTQueryWithTableAndOutput, public ASTQueryWithOnCluster
{
public:
bool if_not_exists{false};
ASTPtr index_name;
/// Stores the IndexDeclaration here.
ASTPtr index_decl;
String getID(char delim) const override;
ASTPtr clone() const override;
ASTPtr getRewrittenASTWithoutOnCluster(const WithoutOnClusterASTRewriteParams & params) const override
{
return removeOnCluster<ASTCreateIndexQuery>(clone(), params.default_database);
}
virtual QueryKind getQueryKind() const override { return QueryKind::Create; }
protected:
void formatQueryImpl(const FormatSettings & settings, FormatState & state, FormatStateStacked frame) const override;
};
}

View File

@ -0,0 +1,63 @@
#include <Common/quoteString.h>
#include <IO/Operators.h>
#include <Parsers/ASTDropIndexQuery.h>
namespace DB
{
/** Get the text that identifies this element. */
String ASTDropIndexQuery::getID(char delim) const
{
return "CreateIndexQuery" + (delim + getDatabase()) + delim + getTable();
}
ASTPtr ASTDropIndexQuery::clone() const
{
auto res = std::make_shared<ASTDropIndexQuery>(*this);
res->children.clear();
res->index_name = index_name->clone();
res->children.push_back(res->index_name);
return res;
}
void ASTDropIndexQuery::formatQueryImpl(const FormatSettings & settings, FormatState & state, FormatStateStacked frame) const
{
frame.need_parens = false;
std::string indent_str = settings.one_line ? "" : std::string(4u * frame.indent, ' ');
settings.ostr << (settings.hilite ? hilite_keyword : "") << indent_str;
settings.ostr << "DROP INDEX " << (if_exists ? "IF EXISTS " : "");
index_name->formatImpl(settings, state, frame);
settings.ostr << " ON ";
settings.ostr << (settings.hilite ? hilite_none : "");
if (table)
{
if (database)
{
settings.ostr << indent_str << backQuoteIfNeed(getDatabase());
settings.ostr << ".";
}
settings.ostr << indent_str << backQuoteIfNeed(getTable());
}
formatOnCluster(settings);
}
ASTPtr ASTDropIndexQuery::convertToASTAlterCommand() const
{
auto command = std::make_shared<ASTAlterCommand>();
command->index = index_name->clone();
command->if_exists = if_exists;
command->type = ASTAlterCommand::DROP_INDEX;
return command;
}
}

View File

@ -0,0 +1,42 @@
#pragma once
#include <optional>
#include <Parsers/ASTAlterQuery.h>
#include <Parsers/ASTQueryWithOnCluster.h>
#include <Parsers/ASTQueryWithTableAndOutput.h>
#include <Parsers/IAST.h>
#include <Parsers/IParserBase.h>
namespace DB
{
/** DROP INDEX [IF EXISTS] name on [db].name
*/
class ASTDropIndexQuery : public ASTQueryWithTableAndOutput, public ASTQueryWithOnCluster
{
public:
bool if_exists{false};
ASTPtr index_name;
String getID(char delim) const override;
ASTPtr clone() const override;
ASTPtr getRewrittenASTWithoutOnCluster(const WithoutOnClusterASTRewriteParams & params) const override
{
return removeOnCluster<ASTDropIndexQuery>(clone(), params.default_database);
}
virtual QueryKind getQueryKind() const override { return QueryKind::Drop; }
/// Convert ASTDropIndexQuery to ASTAlterCommand.
ASTPtr convertToASTAlterCommand() const;
protected:
void formatQueryImpl(const FormatSettings & settings, FormatState & state, FormatStateStacked frame) const override;
};
}

View File

@ -25,9 +25,19 @@ ASTPtr ASTIndexDeclaration::clone() const
void ASTIndexDeclaration::formatImpl(const FormatSettings & s, FormatState & state, FormatStateStacked frame) const
{
s.ostr << backQuoteIfNeed(name);
s.ostr << " ";
expr->formatImpl(s, state, frame);
if (from_create_index)
{
s.ostr << "(";
expr->formatImpl(s, state, frame);
s.ostr << ")";
}
else
{
s.ostr << backQuoteIfNeed(name);
s.ostr << " ";
expr->formatImpl(s, state, frame);
}
s.ostr << (s.hilite ? hilite_keyword : "") << " TYPE " << (s.hilite ? hilite_none : "");
type->formatImpl(s, state, frame);
s.ostr << (s.hilite ? hilite_keyword : "") << " GRANULARITY " << (s.hilite ? hilite_none : "");

View File

@ -16,6 +16,7 @@ public:
IAST * expr;
ASTFunction * type;
UInt64 granularity;
bool from_create_index = false;
/** Get the text that identifies this element. */
String getID(char) const override { return "Index"; }

View File

@ -840,7 +840,6 @@ bool ParserAlterCommandList::parseImpl(Pos & pos, ASTPtr & node, Expected & expe
return true;
}
bool ParserAlterQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
{
auto query = std::make_shared<ASTAlterQuery>();

View File

@ -0,0 +1,120 @@
#include <Parsers/ParserCreateIndexQuery.h>
#include <Parsers/ASTCreateIndexQuery.h>
#include <Parsers/ASTIdentifier.h>
#include <Parsers/ASTIndexDeclaration.h>
#include <Parsers/ASTLiteral.h>
#include <Parsers/ASTFunction.h>
#include <Parsers/CommonParsers.h>
#include <Parsers/ExpressionListParsers.h>
#include <Parsers/ParserDataType.h>
#include <Parsers/parseDatabaseAndTableName.h>
namespace DB
{
bool ParserCreateIndexDeclaration::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
{
ParserKeyword s_type("TYPE");
ParserKeyword s_granularity("GRANULARITY");
ParserDataType data_type_p;
ParserExpression expression_p;
ParserUnsignedInteger granularity_p;
ASTPtr expr;
ASTPtr type;
ASTPtr granularity;
/// Skip name parser for SQL-standard CREATE INDEX
if (!expression_p.parse(pos, expr, expected))
return false;
if (!s_type.ignore(pos, expected))
return false;
if (!data_type_p.parse(pos, type, expected))
return false;
if (!s_granularity.ignore(pos, expected))
return false;
if (!granularity_p.parse(pos, granularity, expected))
return false;
auto index = std::make_shared<ASTIndexDeclaration>();
index->from_create_index = true;
index->granularity = granularity->as<ASTLiteral &>().value.safeGet<UInt64>();
index->set(index->expr, expr);
index->set(index->type, type);
node = index;
return true;
}
bool ParserCreateIndexQuery::parseImpl(IParser::Pos & pos, ASTPtr & node, Expected & expected)
{
auto query = std::make_shared<ASTCreateIndexQuery>();
node = query;
ParserKeyword s_create("CREATE");
ParserKeyword s_index("INDEX");
ParserKeyword s_if_not_exists("IF NOT EXISTS");
ParserKeyword s_on("ON");
ParserIdentifier index_name_p;
ParserCreateIndexDeclaration parser_create_idx_decl;
ASTPtr index_name;
ASTPtr index_decl;
String cluster_str;
bool if_not_exists = false;
if (!s_create.ignore(pos, expected))
return false;
if (!s_index.ignore(pos, expected))
return false;
if (s_if_not_exists.ignore(pos, expected))
if_not_exists = true;
if (!index_name_p.parse(pos, index_name, expected))
return false;
/// ON [db.] table_name
if (!s_on.ignore(pos, expected))
return false;
if (!parseDatabaseAndTableAsAST(pos, expected, query->database, query->table))
return false;
/// [ON cluster_name]
if (s_on.ignore(pos, expected))
{
if (!ASTQueryWithOnCluster::parse(pos, cluster_str, expected))
return false;
}
if (!parser_create_idx_decl.parse(pos, index_decl, expected))
return false;
query->index_name = index_name;
query->children.push_back(index_name);
query->index_decl = index_decl;
query->children.push_back(index_decl);
query->if_not_exists = if_not_exists;
query->cluster = cluster_str;
if (query->database)
query->children.push_back(query->database);
if (query->table)
query->children.push_back(query->table);
return true;
}
}

View File

@ -0,0 +1,31 @@
#pragma once
#include <Parsers/IParserBase.h>
namespace DB
{
/** Query like this:
* CREATE INDEX [IF NOT EXISTS] name ON [db].name (expression) TYPE type GRANULARITY value
*/
class ParserCreateIndexQuery : public IParserBase
{
protected:
const char * getName() const override{ return "CREATE INDEX query"; }
bool parseImpl(Pos & pos, ASTPtr & node, Expected & expected) override;
};
/** Parser for index declaration in create index, where name is ignored. */
class ParserCreateIndexDeclaration : public IParserBase
{
public:
ParserCreateIndexDeclaration() {}
protected:
const char * getName() const override { return "index declaration in create index"; }
bool parseImpl(Pos & pos, ASTPtr & node, Expected & expected) override;
};
}

View File

@ -467,6 +467,7 @@ bool ParserCreateTableQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expe
ParserKeyword s_from("FROM");
ParserKeyword s_on("ON");
ParserToken s_dot(TokenType::Dot);
ParserToken s_comma(TokenType::Comma);
ParserToken s_lparen(TokenType::OpeningRoundBracket);
ParserToken s_rparen(TokenType::ClosingRoundBracket);
ParserStorage storage_p;
@ -574,6 +575,10 @@ bool ParserCreateTableQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expe
if (!table_properties_p.parse(pos, columns_list, expected))
return false;
/// We allow a trailing comma in the columns list for user convenience.
/// Although it diverges from the SQL standard slightly.
s_comma.ignore(pos, expected);
if (!s_rparen.ignore(pos, expected))
return false;

View File

@ -0,0 +1,67 @@
#include <Parsers/ASTDropIndexQuery.h>
#include <Parsers/ASTIdentifier.h>
#include <Parsers/CommonParsers.h>
#include <Parsers/ExpressionElementParsers.h>
#include <Parsers/ParserDropIndexQuery.h>
#include <Parsers/parseDatabaseAndTableName.h>
namespace DB
{
bool ParserDropIndexQuery::parseImpl(IParser::Pos & pos, ASTPtr & node, Expected & expected)
{
auto query = std::make_shared<ASTDropIndexQuery>();
node = query;
ParserKeyword s_drop("DROP");
ParserKeyword s_index("INDEX");
ParserKeyword s_on("ON");
ParserKeyword s_if_exists("IF EXISTS");
ParserIdentifier index_name_p;
String cluster_str;
bool if_exists = false;
if (!s_drop.ignore(pos, expected))
return false;
if (!s_index.ignore(pos, expected))
return false;
if (s_if_exists.ignore(pos, expected))
if_exists = true;
if (!index_name_p.parse(pos, query->index_name, expected))
return false;
/// ON [db.] table_name
if (!s_on.ignore(pos, expected))
return false;
if (!parseDatabaseAndTableAsAST(pos, expected, query->database, query->table))
return false;
/// [ON cluster_name]
if (s_on.ignore(pos, expected))
{
if (!ASTQueryWithOnCluster::parse(pos, cluster_str, expected))
return false;
query->cluster = std::move(cluster_str);
}
if (query->index_name)
query->children.push_back(query->index_name);
query->if_exists = if_exists;
if (query->database)
query->children.push_back(query->database);
if (query->table)
query->children.push_back(query->table);
return true;
}
}

View File

@ -0,0 +1,19 @@
#pragma once
#include <Parsers/IParserBase.h>
namespace DB
{
/** Query like this:
* DROP INDEX [IF EXISTS] name ON [db].name
*/
class ParserDropIndexQuery : public IParserBase
{
protected:
const char * getName() const override{ return "DROP INDEX query"; }
bool parseImpl(Pos & pos, ASTPtr & node, Expected & expected) override;
};
}

View File

@ -2,7 +2,9 @@
#include <Parsers/ParserCreateFunctionQuery.h>
#include <Parsers/ParserBackupQuery.h>
#include <Parsers/ParserCreateQuery.h>
#include <Parsers/ParserCreateIndexQuery.h>
#include <Parsers/ParserDropFunctionQuery.h>
#include <Parsers/ParserDropIndexQuery.h>
#include <Parsers/ParserDropQuery.h>
#include <Parsers/ParserInsertQuery.h>
#include <Parsers/ParserOptimizeQuery.h>
@ -43,6 +45,8 @@ bool ParserQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
ParserCreateSettingsProfileQuery create_settings_profile_p;
ParserCreateFunctionQuery create_function_p;
ParserDropFunctionQuery drop_function_p;
ParserCreateIndexQuery create_index_p;
ParserDropIndexQuery drop_index_p;
ParserDropAccessEntityQuery drop_access_entity_p;
ParserGrantQuery grant_p;
ParserSetRoleQuery set_role_p;
@ -63,6 +67,8 @@ bool ParserQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
|| create_settings_profile_p.parse(pos, node, expected)
|| create_function_p.parse(pos, node, expected)
|| drop_function_p.parse(pos, node, expected)
|| create_index_p.parse(pos, node, expected)
|| drop_index_p.parse(pos, node, expected)
|| drop_access_entity_p.parse(pos, node, expected)
|| grant_p.parse(pos, node, expected)
|| external_ddl_p.parse(pos, node, expected)

View File

@ -20,7 +20,7 @@ namespace DB
}
{
std::unique_lock<std::mutex> lock(mutex);
std::lock_guard lock(mutex);
if (background_exception)
std::rethrow_exception(background_exception);
@ -30,7 +30,7 @@ namespace DB
void ParallelFormattingOutputFormat::addChunk(Chunk chunk, ProcessingUnitType type, bool can_throw_exception)
{
{
std::unique_lock<std::mutex> lock(mutex);
std::lock_guard lock(mutex);
if (background_exception && can_throw_exception)
std::rethrow_exception(background_exception);
}

View File

@ -236,7 +236,7 @@ private:
void onBackgroundException()
{
std::unique_lock<std::mutex> lock(mutex);
std::lock_guard lock(mutex);
if (!background_exception)
{
background_exception = std::current_exception();

View File

@ -121,7 +121,7 @@ void ParallelParsingInputFormat::parserThreadFunction(ThreadGroupStatusPtr threa
void ParallelParsingInputFormat::onBackgroundException(size_t offset)
{
std::unique_lock<std::mutex> lock(mutex);
std::lock_guard lock(mutex);
if (!background_exception)
{
background_exception = std::current_exception();
@ -233,7 +233,7 @@ Chunk ParallelParsingInputFormat::generate()
else
{
// Pass the unit back to the segmentator.
std::unique_lock<std::mutex> lock(mutex);
std::lock_guard lock(mutex);
unit.status = READY_TO_INSERT;
segmentator_condvar.notify_all();
}

View File

@ -19,13 +19,13 @@
namespace DB
{
static ITransformingStep::Traits getTraits()
static ITransformingStep::Traits getTraits(bool should_produce_results_in_order_of_bucket_number)
{
return ITransformingStep::Traits
{
{
.preserves_distinct_columns = false, /// Actually, we may check that distinct names are in aggregation keys
.returns_single_stream = true,
.returns_single_stream = should_produce_results_in_order_of_bucket_number, /// Actually, may also return single stream if should_produce_results_in_order_of_bucket_number = false
.preserves_number_of_streams = false,
.preserves_sorting = false,
},
@ -75,9 +75,10 @@ AggregatingStep::AggregatingStep(
size_t temporary_data_merge_threads_,
bool storage_has_evenly_distributed_read_,
InputOrderInfoPtr group_by_info_,
SortDescription group_by_sort_description_)
SortDescription group_by_sort_description_,
bool should_produce_results_in_order_of_bucket_number_)
: ITransformingStep(
input_stream_, appendGroupingColumn(params_.getHeader(input_stream_.header, final_), grouping_sets_params_), getTraits(), false)
input_stream_, appendGroupingColumn(params_.getHeader(input_stream_.header, final_), grouping_sets_params_), getTraits(should_produce_results_in_order_of_bucket_number_), false)
, params(std::move(params_))
, grouping_sets_params(std::move(grouping_sets_params_))
, final(final_)
@ -88,6 +89,7 @@ AggregatingStep::AggregatingStep(
, storage_has_evenly_distributed_read(storage_has_evenly_distributed_read_)
, group_by_info(std::move(group_by_info_))
, group_by_sort_description(std::move(group_by_sort_description_))
, should_produce_results_in_order_of_bucket_number(should_produce_results_in_order_of_bucket_number_)
{
}
@ -351,7 +353,8 @@ void AggregatingStep::transformPipeline(QueryPipelineBuilder & pipeline, const B
return std::make_shared<AggregatingTransform>(header, transform_params, many_data, counter++, merge_threads, temporary_data_merge_threads);
});
pipeline.resize(1);
/// We add the explicit resize here, but not in case of aggregating in order, since AIO don't use two-level hash tables and thus returns only buckets with bucket_number = -1.
pipeline.resize(should_produce_results_in_order_of_bucket_number ? 1 : pipeline.getNumStreams(), true /* force */);
aggregating = collector.detachProcessors(0);
}

Some files were not shown because too many files have changed in this diff Show More