Merge branch 'master' into sql-insert-format

This commit is contained in:
mergify[bot] 2022-06-29 11:03:07 +00:00 committed by GitHub
commit 9482c99ab8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
410 changed files with 2100 additions and 606 deletions

View File

@ -643,7 +643,7 @@ jobs:
# shellcheck disable=SC2046
docker rm -f $(docker ps -a -q) ||:
sudo rm -fr "$TEMP_PATH" "$CACHES_PATH"
BuilderBinTidy:
BuilderBinClangTidy:
needs: [DockerHubPush]
runs-on: [self-hosted, builder]
steps:
@ -1011,7 +1011,7 @@ jobs:
- BuilderBinFreeBSD
# - BuilderBinGCC
- BuilderBinPPC64
- BuilderBinTidy
- BuilderBinClangTidy
- BuilderDebSplitted
runs-on: [self-hosted, style-checker]
steps:

View File

@ -707,7 +707,7 @@ jobs:
# shellcheck disable=SC2046
docker rm -f $(docker ps -a -q) ||:
sudo rm -fr "$TEMP_PATH" "$CACHES_PATH"
BuilderBinTidy:
BuilderBinClangTidy:
needs: [DockerHubPush, FastTest]
runs-on: [self-hosted, builder]
steps:
@ -1065,7 +1065,7 @@ jobs:
- BuilderBinFreeBSD
# - BuilderBinGCC
- BuilderBinPPC64
- BuilderBinTidy
- BuilderBinClangTidy
- BuilderDebSplitted
runs-on: [self-hosted, style-checker]
if: ${{ success() || failure() }}

View File

@ -89,7 +89,7 @@ public:
inline void returnObject(T && object_to_return)
{
{
std::lock_guard<std::mutex> lock(objects_mutex);
std::lock_guard lock(objects_mutex);
objects.emplace_back(std::move(object_to_return));
--borrowed_objects_size;
@ -107,14 +107,14 @@ public:
/// Allocated objects size by the pool. If allocatedObjectsSize == maxSize then pool is full.
inline size_t allocatedObjectsSize() const
{
std::unique_lock<std::mutex> lock(objects_mutex);
std::lock_guard lock(objects_mutex);
return allocated_objects_size;
}
/// Returns allocatedObjectsSize == maxSize
inline bool isFull() const
{
std::unique_lock<std::mutex> lock(objects_mutex);
std::lock_guard lock(objects_mutex);
return allocated_objects_size == max_size;
}
@ -122,7 +122,7 @@ public:
/// Then client will wait during borrowObject function call.
inline size_t borrowedObjectsSize() const
{
std::unique_lock<std::mutex> lock(objects_mutex);
std::lock_guard lock(objects_mutex);
return borrowed_objects_size;
}

View File

@ -4,7 +4,7 @@
namespace Poco::Util
{
class LayeredConfiguration;
class LayeredConfiguration; // NOLINT(cppcoreguidelines-virtual-class-destructor)
}
/// Import extra command line arguments to configuration. These are command line arguments after --.

View File

@ -124,21 +124,37 @@
#endif
#endif
// Macros for Clang Thread Safety Analysis (TSA). They can be safely ignored by other compilers.
// Feel free to extend, but please stay close to https://clang.llvm.org/docs/ThreadSafetyAnalysis.html#mutexheader
/// Macros for Clang Thread Safety Analysis (TSA). They can be safely ignored by other compilers.
/// Feel free to extend, but please stay close to https://clang.llvm.org/docs/ThreadSafetyAnalysis.html#mutexheader
#if defined(__clang__)
# define TSA_GUARDED_BY(...) __attribute__((guarded_by(__VA_ARGS__))) // data is protected by given capability
# define TSA_PT_GUARDED_BY(...) __attribute__((pt_guarded_by(__VA_ARGS__))) // pointed-to data is protected by the given capability
# define TSA_REQUIRES(...) __attribute__((requires_capability(__VA_ARGS__))) // thread needs exclusive possession of given capability
# define TSA_REQUIRES_SHARED(...) __attribute__((requires_shared_capability(__VA_ARGS__))) // thread needs shared possession of given capability
# define TSA_ACQUIRED_AFTER(...) __attribute__((acquired_after(__VA_ARGS__))) // annotated lock must be locked after given lock
# define TSA_NO_THREAD_SAFETY_ANALYSIS __attribute__((no_thread_safety_analysis)) // disable TSA for a function
# define TSA_GUARDED_BY(...) __attribute__((guarded_by(__VA_ARGS__))) /// data is protected by given capability
# define TSA_PT_GUARDED_BY(...) __attribute__((pt_guarded_by(__VA_ARGS__))) /// pointed-to data is protected by the given capability
# define TSA_REQUIRES(...) __attribute__((requires_capability(__VA_ARGS__))) /// thread needs exclusive possession of given capability
# define TSA_REQUIRES_SHARED(...) __attribute__((requires_shared_capability(__VA_ARGS__))) /// thread needs shared possession of given capability
# define TSA_ACQUIRED_AFTER(...) __attribute__((acquired_after(__VA_ARGS__))) /// annotated lock must be locked after given lock
# define TSA_NO_THREAD_SAFETY_ANALYSIS __attribute__((no_thread_safety_analysis)) /// disable TSA for a function
/// Macros for suppressing TSA warnings for specific reads/writes (instead of suppressing it for the whole function)
/// Consider adding a comment before using these macros.
# define TSA_SUPPRESS_WARNING_FOR_READ(x) [&]() TSA_NO_THREAD_SAFETY_ANALYSIS -> const auto & { return (x); }()
# define TSA_SUPPRESS_WARNING_FOR_WRITE(x) [&]() TSA_NO_THREAD_SAFETY_ANALYSIS -> auto & { return (x); }()
/// This macro is useful when only one thread writes to a member
/// and you want to read this member from the same thread without locking a mutex.
/// It's safe (because no concurrent writes are possible), but TSA generates a warning.
/// (Seems like there's no way to verify it, but it makes sense to distinguish it from TSA_SUPPRESS_WARNING_FOR_READ for readability)
# define TSA_READ_ONE_THREAD(x) TSA_SUPPRESS_WARNING_FOR_READ(x)
#else
# define TSA_GUARDED_BY(...)
# define TSA_PT_GUARDED_BY(...)
# define TSA_REQUIRES(...)
# define TSA_REQUIRES_SHARED(...)
# define TSA_NO_THREAD_SAFETY_ANALYSIS
# define TSA_SUPPRESS_WARNING_FOR_READ(x)
# define TSA_SUPPRESS_WARNING_FOR_WRITE(x)
# define TSA_READ_ONE_THREAD(x)
#endif
/// A template function for suppressing warnings about unused variables or function results.

View File

@ -27,6 +27,6 @@ struct FreeingDeleter
}
};
typedef std::unique_ptr<char, FreeingDeleter> DemangleResult;
using DemangleResult = std::unique_ptr<char, FreeingDeleter>;
DemangleResult tryDemangle(const char * name);

View File

@ -23,10 +23,10 @@ public:
constexpr StrongTypedef(): t() {}
constexpr StrongTypedef(const Self &) = default;
constexpr StrongTypedef(Self &&) = default;
constexpr StrongTypedef(Self &&) noexcept(std::is_nothrow_move_constructible_v<T>) = default;
Self & operator=(const Self &) = default;
Self & operator=(Self &&) = default;
Self & operator=(Self &&) noexcept(std::is_nothrow_move_assignable_v<T>)= default;
template <class Enable = typename std::is_copy_assignable<T>::type>
Self & operator=(const T & rhs) { t = rhs; return *this;}

View File

@ -1,6 +1,6 @@
#pragma once
#include <time.h>
#include <ctime>
#if defined (OS_DARWIN) || defined (OS_SUNOS)
# define CLOCK_MONOTONIC_COARSE CLOCK_MONOTONIC

View File

@ -1,6 +1,6 @@
#pragma once
#include <string.h>
#include <cstring>
#include <type_traits>

View File

@ -27,6 +27,8 @@
#include <type_traits>
#include <initializer_list>
// NOLINTBEGIN(*)
namespace wide
{
template <size_t Bits, typename Signed>
@ -257,4 +259,7 @@ struct hash<wide::integer<Bits, Signed>>;
}
// NOLINTEND(*)
#include "wide_integer_impl.h"

View File

@ -15,6 +15,8 @@
#include <boost/multiprecision/cpp_bin_float.hpp>
#include <boost/math/special_functions/fpclassify.hpp>
// NOLINTBEGIN(*)
/// Use same extended double for all platforms
#if (LDBL_MANT_DIG == 64)
#define CONSTEXPR_FROM_DOUBLE constexpr
@ -1478,3 +1480,5 @@ struct hash<wide::integer<Bits, Signed>>
};
}
// NOLINTEND(*)

View File

@ -90,6 +90,7 @@
#define PCG_EMULATED_128BIT_MATH 1
#endif
// NOLINTBEGIN(*)
namespace pcg_extras {
@ -552,4 +553,6 @@ std::ostream& operator<<(std::ostream& out, printable_typename<T>) {
} // namespace pcg_extras
// NOLINTEND(*)
#endif // PCG_EXTRAS_HPP_INCLUDED

View File

@ -113,6 +113,8 @@
#include "pcg_extras.hpp"
// NOLINTBEGIN(*)
namespace DB
{
struct PcgSerializer;
@ -1777,4 +1779,6 @@ typedef pcg_engines::ext_oneseq_xsh_rs_64_32<14,32,true> pcg32_k16384_fast;
#pragma warning(default:4146)
#endif
// NOLINTEND(*)
#endif // PCG_RAND_HPP_INCLUDED

View File

@ -16,6 +16,8 @@
#include <cstddef>
#include <cstdint>
// NOLINTBEGIN(*)
/* Special width values */
enum {
widechar_nonprint = -1, // The character is not printable.
@ -518,4 +520,6 @@ inline int widechar_wcwidth(wchar_t c) {
return 1;
}
// NOLINTEND(*)
#endif // WIDECHAR_WIDTH_H

View File

@ -113,4 +113,8 @@ if (TARGET ch_contrib::krb5)
target_compile_definitions(_gsasl PRIVATE HAVE_GSSAPI_H=1 USE_GSSAPI=1)
endif()
if (TARGET OpenSSL::SSL)
target_link_libraries(_gsasl PRIVATE OpenSSL::Crypto OpenSSL::SSL)
endif()
add_library(ch_contrib::gsasl ALIAS _gsasl)

2
contrib/poco vendored

@ -1 +1 @@
Subproject commit de35b9fd72b57127abdc3a5beaf0e320d767e356
Subproject commit 0e32cb42db76ddaa76848470219056908053b676

View File

@ -11,6 +11,7 @@ add_library (_poco_xml_expat ${SRCS_EXPAT})
add_library (Poco::XML::Expat ALIAS _poco_xml_expat)
target_include_directories (_poco_xml_expat PUBLIC "${LIBRARY_DIR}/XML/include")
target_include_directories (_poco_xml_expat PRIVATE "${LIBRARY_DIR}/Foundation/include")
# Poco::XML

View File

@ -3,6 +3,7 @@ FROM ubuntu:20.04
# ARG for quick switch to a given ubuntu mirror
ARG apt_archive="http://archive.ubuntu.com"
RUN sed -i "s|http://archive.ubuntu.com|$apt_archive|g" /etc/apt/sources.list
RUN apt-get update \

View File

@ -30,8 +30,8 @@ set -e
# cleanup for retry run if volume is not recreated
# shellcheck disable=SC2046
{
docker kill $(docker ps -aq) || true
docker rm $(docker ps -aq) || true
docker ps -aq | xargs -r docker kill || true
docker ps -aq | xargs -r docker rm || true
}
echo "Start tests"

View File

@ -120,6 +120,10 @@ function run_tests()
ADDITIONAL_OPTIONS+=('--replicated-database')
fi
if [[ -n "$USE_DATABASE_ORDINARY" ]] && [[ "$USE_DATABASE_ORDINARY" -eq 1 ]]; then
ADDITIONAL_OPTIONS+=('--db-engine=Ordinary')
fi
set +e
clickhouse-test -j 2 --testname --shard --zookeeper --check-zookeeper-session --no-stateless --hung-check --print-time \
--skip 00168_parallel_processing_on_replicas "${ADDITIONAL_OPTIONS[@]}" \

View File

@ -115,6 +115,10 @@ function run_tests()
ADDITIONAL_OPTIONS+=("$RUN_BY_HASH_TOTAL")
fi
if [[ -n "$USE_DATABASE_ORDINARY" ]] && [[ "$USE_DATABASE_ORDINARY" -eq 1 ]]; then
ADDITIONAL_OPTIONS+=('--db-engine=Ordinary')
fi
set +e
clickhouse-test --testname --shard --zookeeper --check-zookeeper-session --hung-check --print-time \
--test-runs "$NUM_TRIES" "${ADDITIONAL_OPTIONS[@]}" 2>&1 \

View File

@ -284,6 +284,11 @@ then
rm -rf /var/lib/clickhouse/*
# Make BC check more funny by forcing Ordinary engine for system database
# New version will try to convert it to Atomic on startup
mkdir /var/lib/clickhouse/metadata
echo "ATTACH DATABASE system ENGINE=Ordinary" > /var/lib/clickhouse/metadata/system.sql
# Install previous release packages
install_packages previous_release_package_folder

View File

@ -86,7 +86,7 @@ def process_test_log(log_path):
test_end = True
test_results = [
(test[0], test[1], test[2], "".join(test[3])) for test in test_results
(test[0], test[1], test[2], "".join(test[3]))[:4096] for test in test_results
]
return (

View File

@ -223,7 +223,7 @@ Replication in ClickHouse can be configured on a per-table basis. You could have
Replication is implemented in the `ReplicatedMergeTree` storage engine. The path in `ZooKeeper` is specified as a parameter for the storage engine. All tables with the same path in `ZooKeeper` become replicas of each other: they synchronize their data and maintain consistency. Replicas can be added and removed dynamically simply by creating or dropping a table.
Replication uses an asynchronous multi-master scheme. You can insert data into any replica that has a session with `ZooKeeper`, and data is replicated to all other replicas asynchronously. Because ClickHouse does not support UPDATEs, replication is conflict-free. As there is no quorum acknowledgment of inserts, just-inserted data might be lost if one node fails.
Replication uses an asynchronous multi-master scheme. You can insert data into any replica that has a session with `ZooKeeper`, and data is replicated to all other replicas asynchronously. Because ClickHouse does not support UPDATEs, replication is conflict-free. As there is no quorum acknowledgment of inserts by default, just-inserted data might be lost if one node fails. The insert quorum can be enabled using `insert_quorum` setting.
Metadata for replication is stored in ZooKeeper. There is a replication log that lists what actions to do. Actions are: get part; merge parts; drop a partition, and so on. Each replica copies the replication log to its queue and then executes the actions from the queue. For example, on insertion, the “get the part” action is created in the log, and every replica downloads that part. Merges are coordinated between replicas to get byte-identical results. All parts are merged in the same way on all replicas. One of the leaders initiates a new merge first and writes “merge parts” actions to the log. Multiple replicas (or all) can be leaders at the same time. A replica can be prevented from becoming a leader using the `merge_tree` setting `replicated_can_become_leader`. The leaders are responsible for scheduling background merges.

View File

@ -3,6 +3,74 @@ sidebar_label: Geo
sidebar_position: 62
---
# Geo Functions
# Geo Functions
## Geographical Coordinates Functions
- [greatCircleDistance](./coordinates.md#greatCircleDistance)
- [geoDistance](./coordinates.md#geoDistance)
- [greatCircleAngle](./coordinates.md#greatCircleAngle)
- [pointInEllipses](./coordinates.md#pointInEllipses)
- [pointInPolygon](./coordinates.md#pointInPolygon)
## Geohash Functions
- [geohashEncode](./geohash.md#geohashEncode)
- [geohashDecode](./geohash.md#geohashDecode)
- [geohashesInBox](./geohash.md#geohashesInBox)
## H3 Indexes Functions
- [h3IsValid](./h3#h3IsValid)
- [h3GetResolution](./h3#h3GetResolution)
- [h3EdgeAngle](./h3#h3EdgeAngle)
- [h3EdgeLengthM](./h3#h3EdgeLengthM)
- [h3EdgeLengthKm] (./h3#h3EdgeLengthKm)
- [geoToH3](./h3#geoToH3)
- [h3ToGeo](./h3#h3ToGeo)
- [h3ToGeoBoundary](./h3#h3ToGeoBoundary)
- [h3kRing](./h3#h3kRing)
- [h3GetBaseCell](./h3#h3GetBaseCell)
- [h3HexAreaM2](./h3#h3HexAreaM2)
- [h3HexAreaKm2](./h3#h3HexAreaKm2)
- [h3IndexesAreNeighbors](./h3#h3IndexesAreNeighbors)
- [h3ToChildren](./h3#h3ToChildren)
- [h3ToParent](./h3#h3ToParent)
- [h3ToString](./h3#h3ToString)
- [stringToH3](./h3#stringToH3)
- [h3GetResolution](./h3#h3GetResolution)
- [h3IsResClassIII](./h3#h3IsResClassIII)
- [h3IsPentagon](./h3#h3IsPentagon)
- [h3GetFaces](./h3#h3GetFaces)
- [h3CellAreaM2](./h3#h3CellAreaM2)
- [h3CellAreaRads2](./h3#h3CellAreaRads2)
- [h3ToCenterChild](./h3#h3ToCenterChild)
- [h3ExactEdgeLengthM](./h3#h3ExactEdgeLengthM)
- [h3ExactEdgeLengthKm](./h3#h3ExactEdgeLengthKm)
- [h3ExactEdgeLengthRads](./h3#h3ExactEdgeLengthRads)
- [h3NumHexagons](./h3#h3NumHexagons)
- [h3Line](./h3#h3Line)
- [h3Distance](./h3#h3Distance)
- [h3HexRing](./h3#h3HexRing)
- [h3GetUnidirectionalEdge](./h3#h3GetUnidirectionalEdge)
- [h3UnidirectionalEdgeIsValid](./h3#h3UnidirectionalEdgeIsValid)
- [h3GetOriginIndexFromUnidirectionalEdge](./h3#h3GetOriginIndexFromUnidirectionalEdge)
- [h3GetDestinationIndexFromUnidirectionalEdge](./h3#h3GetDestinationIndexFromUnidirectionalEdge)
- [h3GetIndexesFromUnidirectionalEdge](./h3#h3GetIndexesFromUnidirectionalEdge)
- [h3GetUnidirectionalEdgesFromHexagon](./h3#h3GetUnidirectionalEdgesFromHexagon)
- [h3GetUnidirectionalEdgeBoundary](./h3#h3GetUnidirectionalEdgeBoundary)
## S2 Index Functions
- [geoToS2](./s2#geoToS2)
- [s2ToGeo](./s2#s2ToGeo)
- [s2GetNeighbors](./s2#s2GetNeighbors)
- [s2CellsIntersect](./s2#s2CellsIntersect)
- [s2CapContains](./s2#s2CapContains)
- [s2CapUnion](./s2#s2CapUnion)
- [s2RectAdd](./s2#s2RectAdd)
- [s2RectContains](./s2#s2RectContains)
- [s2RectUinion](./s2#s2RectUinion)
- [s2RectIntersection](./s2#s2RectIntersection)
[Original article](https://clickhouse.com/docs/en/sql-reference/functions/geo/) <!--hide-->

View File

@ -1488,6 +1488,7 @@ int Server::main(const std::vector<std::string> & /*args*/)
/// We load temporary database first, because projections need it.
database_catalog.initializeAndLoadTemporaryDatabase();
loadMetadataSystem(global_context);
maybeConvertOrdinaryDatabaseToAtomic(global_context, DatabaseCatalog::instance().getSystemDatabase());
/// After attaching system databases we can initialize system log.
global_context->initializeSystemLogs();
global_context->setSystemZooKeeperLogAfterInitializationIfNeeded();

View File

@ -223,7 +223,7 @@ void kerberosInit(const String & keytab_file, const String & principal, const St
{
// Using mutex to prevent cache file corruptions
static std::mutex kinit_mtx;
std::unique_lock<std::mutex> lck(kinit_mtx);
std::lock_guard lck(kinit_mtx);
KerberosInit k_init;
k_init.init(keytab_file, principal, cache_name);
}

View File

@ -1,6 +1,6 @@
#pragma once
#include <string.h>
#include <cstring>
#ifdef NDEBUG
#define ALLOCATOR_ASLR 0

View File

@ -1,6 +1,6 @@
#pragma once
#include <string.h>
#include <cstring>
#include <memory>
#include <vector>
#include <boost/noncopyable.hpp>

View File

@ -1,6 +1,6 @@
#pragma once
#include <stddef.h>
#include <cstddef>
#include <cstdint>
#include <utility>
#include <atomic>

View File

@ -130,7 +130,7 @@ public:
int getLineNumber() const { return line_number; }
void setLineNumber(int line_number_) { line_number = line_number_;}
const String getFileName() const { return file_name; }
String getFileName() const { return file_name; }
void setFileName(const String & file_name_) { file_name = file_name_; }
Exception * clone() const override { return new ParsingException(*this); }

View File

@ -2,7 +2,7 @@
#include <Common/FileCache_fwd.h>
namespace Poco { namespace Util { class AbstractConfiguration; } }
namespace Poco { namespace Util { class AbstractConfiguration; } } // NOLINT(cppcoreguidelines-virtual-class-destructor)
namespace DB
{

View File

@ -27,9 +27,9 @@ protected:
String getAliasToOrName(const String & name) const
{
if (aliases.count(name))
if (aliases.contains(name))
return aliases.at(name);
else if (String name_lowercase = Poco::toLower(name); case_insensitive_aliases.count(name_lowercase))
else if (String name_lowercase = Poco::toLower(name); case_insensitive_aliases.contains(name_lowercase))
return case_insensitive_aliases.at(name_lowercase);
else
return name;
@ -108,7 +108,7 @@ public:
bool isAlias(const String & name) const
{
return aliases.count(name) || case_insensitive_aliases.count(name);
return aliases.count(name) || case_insensitive_aliases.contains(name);
}
bool hasNameOrAlias(const String & name) const
@ -125,7 +125,7 @@ public:
return name;
}
virtual ~IFactoryWithAliases() override = default;
~IFactoryWithAliases() override = default;
private:
using InnerMap = std::unordered_map<String, Value>; // name -> creator

View File

@ -1,6 +1,6 @@
#pragma once
#include <string.h>
#include <cstring>
#include <string>
#include <exception>
#include <Common/DateLUT.h>

View File

@ -137,7 +137,7 @@ void OvercommitTracker::onQueryStop(MemoryTracker * tracker)
{
DENY_ALLOCATIONS_IN_SCOPE;
std::unique_lock<std::mutex> lk(overcommit_m);
std::lock_guard lk(overcommit_m);
if (picked_tracker == tracker)
{
LOG_DEBUG_SAFE(getLogger(), "Picked query stopped");

View File

@ -1,6 +1,6 @@
#pragma once
#include <string.h>
#include <cstring>
#include <cstddef>
#include <cassert>
#include <algorithm>

View File

@ -55,7 +55,7 @@ private:
explicit PoolEntryHelper(PooledObject & data_) : data(data_) { data.in_use = true; }
~PoolEntryHelper()
{
std::unique_lock lock(data.pool.mutex);
std::lock_guard lock(data.pool.mutex);
data.in_use = false;
data.pool.available.notify_one();
}
@ -163,7 +163,7 @@ public:
inline size_t size()
{
std::unique_lock lock(mutex);
std::lock_guard lock(mutex);
return items.size();
}

View File

@ -4,7 +4,7 @@
#include "base/types.h"
#include <atomic>
#include <memory>
#include <stddef.h>
#include <cstddef>
/** Implements global counters for various events happening in the application
* - for high level profiling.

View File

@ -7,7 +7,7 @@
#include <array>
#include <optional>
#include <functional>
#include <signal.h>
#include <csignal>
#ifdef OS_DARWIN
// ucontext is not available without _XOPEN_SOURCE

View File

@ -7,8 +7,8 @@
#include <Core/Defines.h>
#include <base/range.h>
#include <Poco/Unicode.h>
#include <stdint.h>
#include <string.h>
#include <cstdint>
#include <cstring>
#ifdef __SSE2__
#include <emmintrin.h>

View File

@ -139,7 +139,7 @@ void SystemLogBase<LogElement>::flush(bool force)
uint64_t this_thread_requested_offset;
{
std::unique_lock lock(mutex);
std::lock_guard lock(mutex);
if (is_shutdown)
return;

View File

@ -209,7 +209,7 @@ template <typename Thread>
void ThreadPoolImpl<Thread>::finalize()
{
{
std::unique_lock lock(mutex);
std::lock_guard lock(mutex);
shutdown = true;
}
@ -224,14 +224,14 @@ void ThreadPoolImpl<Thread>::finalize()
template <typename Thread>
size_t ThreadPoolImpl<Thread>::active() const
{
std::unique_lock lock(mutex);
std::lock_guard lock(mutex);
return scheduled_jobs;
}
template <typename Thread>
bool ThreadPoolImpl<Thread>::finished() const
{
std::unique_lock lock(mutex);
std::lock_guard lock(mutex);
return shutdown;
}
@ -290,7 +290,7 @@ void ThreadPoolImpl<Thread>::worker(typename std::list<Thread>::iterator thread_
job = {};
{
std::unique_lock lock(mutex);
std::lock_guard lock(mutex);
if (!first_exception)
first_exception = std::current_exception(); // NOLINT
if (shutdown_on_exception)
@ -305,7 +305,7 @@ void ThreadPoolImpl<Thread>::worker(typename std::list<Thread>::iterator thread_
}
{
std::unique_lock lock(mutex);
std::lock_guard lock(mutex);
--scheduled_jobs;
if (threads.size() > scheduled_jobs + max_free_threads)

View File

@ -1,7 +1,7 @@
#pragma once
#include <base/types.h>
#include <string.h>
#include <cstring>
#include <algorithm>
#include <utility>
#include <base/range.h>

View File

@ -1,6 +1,6 @@
#pragma once
#include <string.h>
#include <cstring>
#ifdef __SSE2__
# include <emmintrin.h>

View File

@ -576,7 +576,7 @@ nuraft::cb_func::ReturnCode KeeperServer::callbackFunc(nuraft::cb_func::Type typ
auto set_initialized = [this]()
{
std::unique_lock lock(initialized_mutex);
std::lock_guard lock(initialized_mutex);
initialized_flag = true;
initialized_cv.notify_all();
};

View File

@ -54,7 +54,7 @@ DEFINE_FIELD_VECTOR(Map); /// TODO: use map instead of vector.
#undef DEFINE_FIELD_VECTOR
using FieldMap = std::map<String, Field, std::less<String>, AllocatorWithMemoryTracking<std::pair<const String, Field>>>;
using FieldMap = std::map<String, Field, std::less<>, AllocatorWithMemoryTracking<std::pair<const String, Field>>>;
#define DEFINE_FIELD_MAP(X) \
struct X : public FieldMap \

View File

@ -445,7 +445,6 @@ static constexpr UInt64 operator""_GiB(unsigned long long value)
M(Seconds, wait_for_window_view_fire_signal_timeout, 10, "Timeout for waiting for window view fire signal in event time processing", 0) \
M(UInt64, min_free_disk_space_for_temporary_data, 0, "The minimum disk space to keep while writing temporary data used in external sorting and aggregation.", 0) \
\
M(DefaultDatabaseEngine, default_database_engine, DefaultDatabaseEngine::Atomic, "Default database engine.", 0) \
M(DefaultTableEngine, default_table_engine, DefaultTableEngine::None, "Default table engine used when ENGINE is not set in CREATE statement.",0) \
M(Bool, show_table_uuid_in_table_create_query_if_not_nil, false, "For tables in databases with Engine=Atomic show UUID of the table in its CREATE query.", 0) \
M(Bool, database_atomic_wait_for_drop_and_detach_synchronously, false, "When executing DROP or DETACH TABLE in Atomic database, wait for table data to be finally dropped or detached.", 0) \
@ -640,6 +639,7 @@ static constexpr UInt64 operator""_GiB(unsigned long long value)
MAKE_OBSOLETE(M, UInt64, background_schedule_pool_size, 128) \
MAKE_OBSOLETE(M, UInt64, background_message_broker_schedule_pool_size, 16) \
MAKE_OBSOLETE(M, UInt64, background_distributed_schedule_pool_size, 16) \
MAKE_OBSOLETE(M, DefaultDatabaseEngine, default_database_engine, DefaultDatabaseEngine::Atomic) \
/** The section above is for obsolete settings. Do not add anything there. */

View File

@ -925,7 +925,7 @@ void BaseDaemon::handleSignal(int signal_id)
signal_id == SIGQUIT ||
signal_id == SIGTERM)
{
std::unique_lock<std::mutex> lock(signal_handler_mutex);
std::lock_guard lock(signal_handler_mutex);
{
++terminate_signals_counter;
sigint_signals_counter += signal_id == SIGINT;

View File

@ -73,7 +73,7 @@ String DatabaseAtomic::getTableDataPath(const ASTCreateQuery & query) const
void DatabaseAtomic::drop(ContextPtr)
{
assert(tables.empty());
assert(TSA_SUPPRESS_WARNING_FOR_READ(tables).empty());
try
{
fs::remove(path_to_metadata_symlink);
@ -90,40 +90,40 @@ void DatabaseAtomic::attachTable(ContextPtr /* context_ */, const String & name,
{
assert(relative_table_path != data_path && !relative_table_path.empty());
DetachedTables not_in_use;
std::unique_lock lock(mutex);
std::lock_guard lock(mutex);
not_in_use = cleanupDetachedTables();
auto table_id = table->getStorageID();
assertDetachedTableNotInUse(table_id.uuid);
DatabaseOrdinary::attachTableUnlocked(name, table, lock);
DatabaseOrdinary::attachTableUnlocked(name, table);
table_name_to_path.emplace(std::make_pair(name, relative_table_path));
}
StoragePtr DatabaseAtomic::detachTable(ContextPtr /* context */, const String & name)
{
DetachedTables not_in_use;
std::unique_lock lock(mutex);
auto table = DatabaseOrdinary::detachTableUnlocked(name, lock);
std::lock_guard lock(mutex);
auto table = DatabaseOrdinary::detachTableUnlocked(name);
table_name_to_path.erase(name);
detached_tables.emplace(table->getStorageID().uuid, table);
not_in_use = cleanupDetachedTables(); //-V1001
return table;
}
void DatabaseAtomic::dropTable(ContextPtr local_context, const String & table_name, bool no_delay)
void DatabaseAtomic::dropTable(ContextPtr local_context, const String & table_name, bool sync)
{
auto table = tryGetTable(table_name, local_context);
/// Remove the inner table (if any) to avoid deadlock
/// (due to attempt to execute DROP from the worker thread)
if (table)
table->dropInnerTableIfAny(no_delay, local_context);
table->dropInnerTableIfAny(sync, local_context);
else
throw Exception(ErrorCodes::UNKNOWN_TABLE, "Table {}.{} doesn't exist",
backQuote(database_name), backQuote(table_name));
backQuote(getDatabaseName()), backQuote(table_name));
String table_metadata_path = getObjectMetadataPath(table_name);
String table_metadata_path_drop;
{
std::unique_lock lock(mutex);
std::lock_guard lock(mutex);
table_metadata_path_drop = DatabaseCatalog::instance().getPathForDroppedMetadata(table->getStorageID());
auto txn = local_context->getZooKeeperMetadataTransaction();
if (txn && !local_context->isInternalSubquery())
@ -136,7 +136,7 @@ void DatabaseAtomic::dropTable(ContextPtr local_context, const String & table_na
/// TODO better detection and recovery
fs::rename(table_metadata_path, table_metadata_path_drop); /// Mark table as dropped
DatabaseOrdinary::detachTableUnlocked(table_name, lock); /// Should never throw
DatabaseOrdinary::detachTableUnlocked(table_name); /// Should never throw
table_name_to_path.erase(table_name);
}
@ -145,11 +145,12 @@ void DatabaseAtomic::dropTable(ContextPtr local_context, const String & table_na
/// Notify DatabaseCatalog that table was dropped. It will remove table data in background.
/// Cleanup is performed outside of database to allow easily DROP DATABASE without waiting for cleanup to complete.
DatabaseCatalog::instance().enqueueDroppedTableCleanup(table->getStorageID(), table, table_metadata_path_drop, no_delay);
DatabaseCatalog::instance().enqueueDroppedTableCleanup(table->getStorageID(), table, table_metadata_path_drop, sync);
}
void DatabaseAtomic::renameTable(ContextPtr local_context, const String & table_name, IDatabase & to_database,
const String & to_table_name, bool exchange, bool dictionary)
TSA_NO_THREAD_SAFETY_ANALYSIS /// TSA does not support conditional locking
{
if (typeid(*this) != typeid(to_database))
{
@ -173,7 +174,7 @@ void DatabaseAtomic::renameTable(ContextPtr local_context, const String & table_
String old_metadata_path = getObjectMetadataPath(table_name);
String new_metadata_path = to_database.getObjectMetadataPath(to_table_name);
auto detach = [](DatabaseAtomic & db, const String & table_name_, bool has_symlink)
auto detach = [](DatabaseAtomic & db, const String & table_name_, bool has_symlink) TSA_REQUIRES(db.mutex)
{
auto it = db.table_name_to_path.find(table_name_);
String table_data_path_saved;
@ -188,7 +189,7 @@ void DatabaseAtomic::renameTable(ContextPtr local_context, const String & table_
return table_data_path_saved;
};
auto attach = [](DatabaseAtomic & db, const String & table_name_, const String & table_data_path_, const StoragePtr & table_)
auto attach = [](DatabaseAtomic & db, const String & table_name_, const String & table_data_path_, const StoragePtr & table_) TSA_REQUIRES(db.mutex)
{
db.tables.emplace(table_name_, table_);
if (table_data_path_.empty())
@ -229,9 +230,9 @@ void DatabaseAtomic::renameTable(ContextPtr local_context, const String & table_
}
if (!exchange)
other_db.checkMetadataFilenameAvailabilityUnlocked(to_table_name, inside_database ? db_lock : other_db_lock);
other_db.checkMetadataFilenameAvailabilityUnlocked(to_table_name);
StoragePtr table = getTableUnlocked(table_name, db_lock);
StoragePtr table = getTableUnlocked(table_name);
if (dictionary && !table->isDictionary())
throw Exception(ErrorCodes::INCORRECT_QUERY, "Use RENAME/EXCHANGE TABLE (instead of RENAME/EXCHANGE DICTIONARY) for tables");
@ -244,7 +245,7 @@ void DatabaseAtomic::renameTable(ContextPtr local_context, const String & table_
StorageID other_table_new_id = StorageID::createEmpty();
if (exchange)
{
other_table = other_db.getTableUnlocked(to_table_name, other_db_lock);
other_table = other_db.getTableUnlocked(to_table_name);
if (dictionary && !other_table->isDictionary())
throw Exception(ErrorCodes::INCORRECT_QUERY, "Use RENAME/EXCHANGE TABLE (instead of RENAME/EXCHANGE DICTIONARY) for tables");
other_table_new_id = {database_name, table_name, other_table->getStorageID().uuid};
@ -294,7 +295,7 @@ void DatabaseAtomic::commitCreateTable(const ASTCreateQuery & query, const Stora
auto table_data_path = getTableDataPath(query);
try
{
std::unique_lock lock{mutex};
std::lock_guard lock{mutex};
if (query.getDatabase() != database_name)
throw Exception(ErrorCodes::UNKNOWN_DATABASE, "Database was renamed to `{}`, cannot create table in `{}`",
database_name, query.getDatabase());
@ -312,7 +313,7 @@ void DatabaseAtomic::commitCreateTable(const ASTCreateQuery & query, const Stora
/// It throws if `table_metadata_path` already exists (it's possible if table was detached)
renameNoReplace(table_metadata_tmp_path, table_metadata_path); /// Commit point (a sort of)
attachTableUnlocked(query.getTable(), table, lock); /// Should never throw
attachTableUnlocked(query.getTable(), table); /// Should never throw
table_name_to_path.emplace(query.getTable(), table_data_path);
}
catch (...)
@ -330,8 +331,8 @@ void DatabaseAtomic::commitAlterTable(const StorageID & table_id, const String &
bool check_file_exists = true;
SCOPE_EXIT({ std::error_code code; if (check_file_exists) std::filesystem::remove(table_metadata_tmp_path, code); });
std::unique_lock lock{mutex};
auto actual_table_id = getTableUnlocked(table_id.table_name, lock)->getStorageID();
std::lock_guard lock{mutex};
auto actual_table_id = getTableUnlocked(table_id.table_name)->getStorageID();
if (table_id.uuid != actual_table_id.uuid)
throw Exception("Cannot alter table because it was renamed", ErrorCodes::CANNOT_ASSIGN_ALTER);
@ -363,7 +364,7 @@ void DatabaseAtomic::assertDetachedTableNotInUse(const UUID & uuid)
void DatabaseAtomic::setDetachedTableNotInUseForce(const UUID & uuid)
{
std::unique_lock lock{mutex};
std::lock_guard lock{mutex};
detached_tables.erase(uuid);
}

View File

@ -35,7 +35,7 @@ public:
bool exchange,
bool dictionary) override;
void dropTable(ContextPtr context, const String & table_name, bool no_delay) override;
void dropTable(ContextPtr context, const String & table_name, bool sync) override;
void attachTable(ContextPtr context, const String & name, const StoragePtr & table, const String & relative_table_path) override;
StoragePtr detachTable(ContextPtr context, const String & name) override;
@ -70,9 +70,9 @@ protected:
void commitCreateTable(const ASTCreateQuery & query, const StoragePtr & table,
const String & table_metadata_tmp_path, const String & table_metadata_path, ContextPtr query_context) override;
void assertDetachedTableNotInUse(const UUID & uuid);
void assertDetachedTableNotInUse(const UUID & uuid) TSA_REQUIRES(mutex);
using DetachedTables = std::unordered_map<UUID, StoragePtr>;
[[nodiscard]] DetachedTables cleanupDetachedTables();
[[nodiscard]] DetachedTables cleanupDetachedTables() TSA_REQUIRES(mutex);
void tryCreateMetadataSymlink();
@ -80,9 +80,9 @@ protected:
//TODO store path in DatabaseWithOwnTables::tables
using NameToPathMap = std::unordered_map<String, String>;
NameToPathMap table_name_to_path;
NameToPathMap table_name_to_path TSA_GUARDED_BY(mutex);
DetachedTables detached_tables;
DetachedTables detached_tables TSA_GUARDED_BY(mutex);
String path_to_table_symlinks;
String path_to_metadata_symlink;
const UUID db_uuid;

View File

@ -62,36 +62,19 @@ namespace ErrorCodes
DatabasePtr DatabaseFactory::get(const ASTCreateQuery & create, const String & metadata_path, ContextPtr context)
{
bool created = false;
/// Creates store/xxx/ for Atomic
fs::create_directories(fs::path(metadata_path).parent_path());
try
{
/// Creates store/xxx/ for Atomic
fs::create_directories(fs::path(metadata_path).parent_path());
DatabasePtr impl = getImpl(create, metadata_path, context);
/// Before 20.7 it's possible that .sql metadata file does not exist for some old database.
/// In this case Ordinary database is created on server startup if the corresponding metadata directory exists.
/// So we should remove metadata directory if database creation failed.
/// TODO remove this code
created = fs::create_directory(metadata_path);
if (impl && context->hasQueryContext() && context->getSettingsRef().log_queries)
context->getQueryContext()->addQueryFactoriesInfo(Context::QueryLogFactories::Database, impl->getEngineName());
DatabasePtr impl = getImpl(create, metadata_path, context);
/// Attach database metadata
if (impl && create.comment)
impl->setDatabaseComment(create.comment->as<ASTLiteral>()->value.safeGet<String>());
if (impl && context->hasQueryContext() && context->getSettingsRef().log_queries)
context->getQueryContext()->addQueryFactoriesInfo(Context::QueryLogFactories::Database, impl->getEngineName());
// Attach database metadata
if (impl && create.comment)
impl->setDatabaseComment(create.comment->as<ASTLiteral>()->value.safeGet<String>());
return impl;
}
catch (...)
{
if (created && fs::exists(metadata_path))
fs::remove_all(metadata_path);
throw;
}
return impl;
}
template <typename ValueType>
@ -139,8 +122,14 @@ DatabasePtr DatabaseFactory::getImpl(const ASTCreateQuery & create, const String
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Database engine `{}` cannot have table overrides", engine_name);
if (engine_name == "Ordinary")
{
if (!create.attach && !context->getSettingsRef().allow_deprecated_database_ordinary)
throw Exception(ErrorCodes::UNKNOWN_DATABASE_ENGINE,
"Ordinary database engine is deprecated (see also allow_deprecated_database_ordinary setting)");
return std::make_shared<DatabaseOrdinary>(database_name, metadata_path, context);
else if (engine_name == "Atomic")
}
if (engine_name == "Atomic")
return std::make_shared<DatabaseAtomic>(database_name, metadata_path, uuid, context);
else if (engine_name == "Memory")
return std::make_shared<DatabaseMemory>(database_name, context);

View File

@ -77,10 +77,10 @@ void DatabaseLazy::createTable(
void DatabaseLazy::dropTable(
ContextPtr local_context,
const String & table_name,
bool no_delay)
bool sync)
{
SCOPE_EXIT_MEMORY_SAFE({ clearExpiredTables(); });
DatabaseOnDisk::dropTable(local_context, table_name, no_delay);
DatabaseOnDisk::dropTable(local_context, table_name, sync);
}
void DatabaseLazy::renameTable(
@ -158,6 +158,7 @@ DatabaseTablesIteratorPtr DatabaseLazy::getTablesIterator(ContextPtr, const Filt
bool DatabaseLazy::empty() const
{
std::lock_guard lock(mutex);
return tables_cache.empty();
}

View File

@ -37,7 +37,7 @@ public:
void dropTable(
ContextPtr context,
const String & table_name,
bool no_delay) override;
bool sync) override;
void renameTable(
ContextPtr context,
@ -102,8 +102,8 @@ private:
const time_t expiration_time;
/// TODO use DatabaseWithOwnTablesBase::tables
mutable TablesCache tables_cache;
mutable CacheExpirationQueue cache_expiration_queue;
mutable TablesCache tables_cache TSA_GUARDED_BY(mutex);
mutable CacheExpirationQueue cache_expiration_queue TSA_GUARDED_BY(mutex);
StoragePtr loadTable(const String & table_name) const;

View File

@ -32,8 +32,8 @@ void DatabaseMemory::createTable(
const StoragePtr & table,
const ASTPtr & query)
{
std::unique_lock lock{mutex};
attachTableUnlocked(table_name, table, lock);
std::lock_guard lock{mutex};
attachTableUnlocked(table_name, table);
/// Clean the query from temporary flags.
ASTPtr query_to_store = query;
@ -52,23 +52,24 @@ void DatabaseMemory::createTable(
void DatabaseMemory::dropTable(
ContextPtr /*context*/,
const String & table_name,
bool /*no_delay*/)
bool /*sync*/)
{
std::unique_lock lock{mutex};
auto table = detachTableUnlocked(table_name, lock);
StoragePtr table;
{
std::lock_guard lock{mutex};
table = detachTableUnlocked(table_name);
}
try
{
/// Remove table without lock since:
/// - it does not require it
/// - it may cause lock-order-inversion if underlying storage need to
/// resolve tables (like StorageLiveView)
SCOPE_EXIT(lock.lock());
lock.unlock();
table->drop();
if (table->storesDataOnDisk())
{
assert(database_name != DatabaseCatalog::TEMPORARY_DATABASE);
assert(getDatabaseName() != DatabaseCatalog::TEMPORARY_DATABASE);
fs::path table_data_dir{getTableDataPath(table_name)};
if (fs::exists(table_data_dir))
fs::remove_all(table_data_dir);
@ -76,10 +77,13 @@ void DatabaseMemory::dropTable(
}
catch (...)
{
std::lock_guard lock{mutex};
assert(database_name != DatabaseCatalog::TEMPORARY_DATABASE);
attachTableUnlocked(table_name, table, lock);
attachTableUnlocked(table_name, table);
throw;
}
std::lock_guard lock{mutex};
table->is_dropped = true;
create_queries.erase(table_name);
UUID table_uuid = table->getStorageID().uuid;

View File

@ -32,7 +32,7 @@ public:
void dropTable(
ContextPtr context,
const String & table_name,
bool no_delay) override;
bool sync) override;
ASTPtr getCreateTableQueryImpl(const String & name, ContextPtr context, bool throw_on_error) const override;
ASTPtr getCreateDatabaseQuery() const override;
@ -51,9 +51,9 @@ public:
void alterTable(ContextPtr local_context, const StorageID & table_id, const StorageInMemoryMetadata & metadata) override;
private:
String data_path;
const String data_path;
using NameToASTCreate = std::unordered_map<String, ASTPtr>;
NameToASTCreate create_queries;
NameToASTCreate create_queries TSA_GUARDED_BY(mutex);
};
}

View File

@ -281,7 +281,7 @@ void DatabaseOnDisk::detachTablePermanently(ContextPtr query_context, const Stri
}
}
void DatabaseOnDisk::dropTable(ContextPtr local_context, const String & table_name, bool /*no_delay*/)
void DatabaseOnDisk::dropTable(ContextPtr local_context, const String & table_name, bool /*sync*/)
{
String table_metadata_path = getObjectMetadataPath(table_name);
String table_metadata_path_drop = table_metadata_path + drop_suffix;
@ -321,11 +321,11 @@ void DatabaseOnDisk::dropTable(ContextPtr local_context, const String & table_na
void DatabaseOnDisk::checkMetadataFilenameAvailability(const String & to_table_name) const
{
std::unique_lock lock(mutex);
checkMetadataFilenameAvailabilityUnlocked(to_table_name, lock);
std::lock_guard lock(mutex);
checkMetadataFilenameAvailabilityUnlocked(to_table_name);
}
void DatabaseOnDisk::checkMetadataFilenameAvailabilityUnlocked(const String & to_table_name, std::unique_lock<std::mutex> &) const
void DatabaseOnDisk::checkMetadataFilenameAvailabilityUnlocked(const String & to_table_name) const
{
String table_metadata_path = getObjectMetadataPath(to_table_name);
@ -503,7 +503,7 @@ ASTPtr DatabaseOnDisk::getCreateDatabaseQuery() const
void DatabaseOnDisk::drop(ContextPtr local_context)
{
assert(tables.empty());
assert(TSA_SUPPRESS_WARNING_FOR_READ(tables).empty());
if (local_context->getSettingsRef().force_remove_data_recursively_on_drop)
{
fs::remove_all(local_context->getPath() + getDataPath());
@ -725,8 +725,6 @@ ASTPtr DatabaseOnDisk::getCreateQueryFromStorage(const String & table_name, cons
void DatabaseOnDisk::modifySettingsMetadata(const SettingsChanges & settings_changes, ContextPtr query_context)
{
std::lock_guard lock(modify_settings_mutex);
auto create_query = getCreateDatabaseQuery()->clone();
auto * create = create_query->as<ASTCreateQuery>();
auto * settings = create->storage->settings;
@ -759,7 +757,7 @@ void DatabaseOnDisk::modifySettingsMetadata(const SettingsChanges & settings_cha
writeChar('\n', statement_buf);
String statement = statement_buf.str();
String database_name_escaped = escapeForFileName(database_name);
String database_name_escaped = escapeForFileName(TSA_SUPPRESS_WARNING_FOR_READ(database_name)); /// FIXME
fs::path metadata_root_path = fs::canonical(query_context->getGlobalContext()->getPath());
fs::path metadata_file_tmp_path = fs::path(metadata_root_path) / "metadata" / (database_name_escaped + ".sql.tmp");
fs::path metadata_file_path = fs::path(metadata_root_path) / "metadata" / (database_name_escaped + ".sql");

View File

@ -43,7 +43,7 @@ public:
void dropTable(
ContextPtr context,
const String & table_name,
bool no_delay) override;
bool sync) override;
void renameTable(
ContextPtr context,
@ -70,7 +70,7 @@ public:
/// will throw when the table we want to attach already exists (in active / detached / detached permanently form)
void checkMetadataFilenameAvailability(const String & to_table_name) const override;
void checkMetadataFilenameAvailabilityUnlocked(const String & to_table_name, std::unique_lock<std::mutex> &) const;
void checkMetadataFilenameAvailabilityUnlocked(const String & to_table_name) const TSA_REQUIRES(mutex);
void modifySettingsMetadata(const SettingsChanges & settings_changes, ContextPtr query_context);
@ -99,9 +99,6 @@ protected:
const String metadata_path;
const String data_path;
/// For alter settings.
std::mutex modify_settings_mutex;
};
}

View File

@ -174,7 +174,8 @@ void DatabaseOrdinary::loadTablesMetadata(ContextPtr local_context, ParsedTables
if (ast)
{
auto * create_query = ast->as<ASTCreateQuery>();
create_query->setDatabase(database_name);
/// NOTE No concurrent writes are possible during database loading
create_query->setDatabase(TSA_SUPPRESS_WARNING_FOR_READ(database_name));
/// Even if we don't load the table we can still mark the uuid of it as taken.
if (create_query->uuid != UUIDHelpers::Nil)
@ -201,7 +202,7 @@ void DatabaseOrdinary::loadTablesMetadata(ContextPtr local_context, ParsedTables
return;
}
QualifiedTableName qualified_name{database_name, create_query->getTable()};
QualifiedTableName qualified_name{TSA_SUPPRESS_WARNING_FOR_READ(database_name), create_query->getTable()};
TableNamesSet loading_dependencies = getDependenciesSetFromCreateQuery(getContext(), qualified_name, ast);
std::lock_guard lock{metadata.mutex};
@ -234,12 +235,12 @@ void DatabaseOrdinary::loadTablesMetadata(ContextPtr local_context, ParsedTables
size_t tables_in_database = objects_in_database - dictionaries_in_database;
LOG_INFO(log, "Metadata processed, database {} has {} tables and {} dictionaries in total.",
database_name, tables_in_database, dictionaries_in_database);
TSA_SUPPRESS_WARNING_FOR_READ(database_name), tables_in_database, dictionaries_in_database);
}
void DatabaseOrdinary::loadTableFromMetadata(ContextMutablePtr local_context, const String & file_path, const QualifiedTableName & name, const ASTPtr & ast, bool force_restore)
{
assert(name.database == database_name);
assert(name.database == TSA_SUPPRESS_WARNING_FOR_READ(database_name));
const auto & create_query = ast->as<const ASTCreateQuery &>();
tryAttachTable(
@ -255,7 +256,8 @@ void DatabaseOrdinary::startupTables(ThreadPool & thread_pool, bool /*force_rest
{
LOG_INFO(log, "Starting up tables.");
const size_t total_tables = tables.size();
/// NOTE No concurrent writes are possible during database loading
const size_t total_tables = TSA_SUPPRESS_WARNING_FOR_READ(tables).size();
if (!total_tables)
return;
@ -271,7 +273,7 @@ void DatabaseOrdinary::startupTables(ThreadPool & thread_pool, bool /*force_rest
try
{
for (const auto & table : tables)
for (const auto & table : TSA_SUPPRESS_WARNING_FOR_READ(tables))
thread_pool.scheduleOrThrowOnError([&]() { startup_one_table(table.second); });
}
catch (...)

View File

@ -148,7 +148,7 @@ ClusterPtr DatabaseReplicated::getClusterImpl() const
if (hosts.empty())
throw Exception(ErrorCodes::NO_ACTIVE_REPLICAS, "No replicas of database {} found. "
"It's possible if the first replica is not fully created yet "
"or if the last replica was just dropped or due to logical error", database_name);
"or if the last replica was just dropped or due to logical error", zookeeper_path);
Int32 cversion = stat.cversion;
::sort(hosts.begin(), hosts.end());
@ -213,7 +213,7 @@ ClusterPtr DatabaseReplicated::getClusterImpl() const
treat_local_port_as_remote,
cluster_auth_info.cluster_secure_connection,
/*priority=*/1,
database_name,
TSA_SUPPRESS_WARNING_FOR_READ(database_name), /// FIXME
cluster_auth_info.cluster_secret);
}
@ -588,7 +588,7 @@ void DatabaseReplicated::recoverLostReplica(const ZooKeeperPtr & current_zookeep
query_context->makeQueryContext();
query_context->getClientInfo().query_kind = ClientInfo::QueryKind::SECONDARY_QUERY;
query_context->getClientInfo().is_replicated_database_internal = true;
query_context->setCurrentDatabase(database_name);
query_context->setCurrentDatabase(getDatabaseName());
query_context->setCurrentQueryId("");
auto txn = std::make_shared<ZooKeeperMetadataTransaction>(current_zookeeper, zookeeper_path, false, "");
query_context->initZooKeeperMetadataTransaction(txn);
@ -608,6 +608,7 @@ void DatabaseReplicated::recoverLostReplica(const ZooKeeperPtr & current_zookeep
/// and make possible creation of new table with the same UUID.
String query = fmt::format("CREATE DATABASE IF NOT EXISTS {} ENGINE=Ordinary", backQuoteIfNeed(to_db_name));
auto query_context = Context::createCopy(getContext());
query_context->setSetting("allow_deprecated_database_ordinary", 1);
executeQuery(query, query_context, true);
/// But we want to avoid discarding UUID of ReplicatedMergeTree tables, because it will not work
@ -811,7 +812,7 @@ void DatabaseReplicated::shutdown()
}
void DatabaseReplicated::dropTable(ContextPtr local_context, const String & table_name, bool no_delay)
void DatabaseReplicated::dropTable(ContextPtr local_context, const String & table_name, bool sync)
{
auto txn = local_context->getZooKeeperMetadataTransaction();
assert(!ddl_worker->isCurrentlyActive() || txn || startsWith(table_name, ".inner_id."));
@ -820,7 +821,7 @@ void DatabaseReplicated::dropTable(ContextPtr local_context, const String & tabl
String metadata_zk_path = zookeeper_path + "/metadata/" + escapeForFileName(table_name);
txn->addOp(zkutil::makeRemoveRequest(metadata_zk_path, -1));
}
DatabaseAtomic::dropTable(local_context, table_name, no_delay);
DatabaseAtomic::dropTable(local_context, table_name, sync);
}
void DatabaseReplicated::renameTable(ContextPtr local_context, const String & table_name, IDatabase & to_database,

View File

@ -30,7 +30,7 @@ public:
String getEngineName() const override { return "Replicated"; }
/// If current query is initial, then the following methods add metadata updating ZooKeeper operations to current ZooKeeperMetadataTransaction.
void dropTable(ContextPtr, const String & table_name, bool no_delay) override;
void dropTable(ContextPtr, const String & table_name, bool sync) override;
void renameTable(ContextPtr context, const String & table_name, IDatabase & to_database,
const String & to_table_name, bool exchange, bool dictionary) override;
void commitCreateTable(const ASTCreateQuery & query, const StoragePtr & table,

View File

@ -218,11 +218,11 @@ bool DatabaseWithOwnTablesBase::empty() const
StoragePtr DatabaseWithOwnTablesBase::detachTable(ContextPtr /* context_ */, const String & table_name)
{
std::unique_lock lock(mutex);
return detachTableUnlocked(table_name, lock);
std::lock_guard lock(mutex);
return detachTableUnlocked(table_name);
}
StoragePtr DatabaseWithOwnTablesBase::detachTableUnlocked(const String & table_name, std::unique_lock<std::mutex> &)
StoragePtr DatabaseWithOwnTablesBase::detachTableUnlocked(const String & table_name)
{
StoragePtr res;
@ -245,11 +245,11 @@ StoragePtr DatabaseWithOwnTablesBase::detachTableUnlocked(const String & table_n
void DatabaseWithOwnTablesBase::attachTable(ContextPtr /* context_ */, const String & table_name, const StoragePtr & table, const String &)
{
std::unique_lock lock(mutex);
attachTableUnlocked(table_name, table, lock);
std::lock_guard lock(mutex);
attachTableUnlocked(table_name, table);
}
void DatabaseWithOwnTablesBase::attachTableUnlocked(const String & table_name, const StoragePtr & table, std::unique_lock<std::mutex> &)
void DatabaseWithOwnTablesBase::attachTableUnlocked(const String & table_name, const StoragePtr & table)
{
auto table_id = table->getStorageID();
if (table_id.database_name != database_name)
@ -313,7 +313,7 @@ DatabaseWithOwnTablesBase::~DatabaseWithOwnTablesBase()
}
}
StoragePtr DatabaseWithOwnTablesBase::getTableUnlocked(const String & table_name, std::unique_lock<std::mutex> &) const
StoragePtr DatabaseWithOwnTablesBase::getTableUnlocked(const String & table_name) const
{
auto it = tables.find(table_name);
if (it != tables.end())

View File

@ -45,14 +45,14 @@ public:
~DatabaseWithOwnTablesBase() override;
protected:
Tables tables;
Tables tables TSA_GUARDED_BY(mutex);
Poco::Logger * log;
DatabaseWithOwnTablesBase(const String & name_, const String & logger, ContextPtr context);
void attachTableUnlocked(const String & table_name, const StoragePtr & table, std::unique_lock<std::mutex> & lock);
StoragePtr detachTableUnlocked(const String & table_name, std::unique_lock<std::mutex> & lock);
StoragePtr getTableUnlocked(const String & table_name, std::unique_lock<std::mutex> & lock) const;
void attachTableUnlocked(const String & table_name, const StoragePtr & table) TSA_REQUIRES(mutex);
StoragePtr detachTableUnlocked(const String & table_name) TSA_REQUIRES(mutex);
StoragePtr getTableUnlocked(const String & table_name) const TSA_REQUIRES(mutex);
};
}

View File

@ -19,7 +19,7 @@ StoragePtr IDatabase::getTable(const String & name, ContextPtr context) const
{
if (auto storage = tryGetTable(name, context))
return storage;
throw Exception(ErrorCodes::UNKNOWN_TABLE, "Table {}.{} doesn't exist", backQuoteIfNeed(database_name), backQuoteIfNeed(name));
throw Exception(ErrorCodes::UNKNOWN_TABLE, "Table {}.{} doesn't exist", backQuoteIfNeed(getDatabaseName()), backQuoteIfNeed(name));
}
ASTPtr IDatabase::getCreateDatabaseQueryForBackup() const

View File

@ -198,7 +198,7 @@ public:
virtual void dropTable( /// NOLINT
ContextPtr /*context*/,
const String & /*name*/,
[[maybe_unused]] bool no_delay = false)
[[maybe_unused]] bool sync = false)
{
throw Exception("There is no DROP TABLE query for Database" + getEngineName(), ErrorCodes::NOT_IMPLEMENTED);
}
@ -356,8 +356,8 @@ protected:
}
mutable std::mutex mutex;
String database_name;
String comment;
String database_name TSA_GUARDED_BY(mutex);
String comment TSA_GUARDED_BY(mutex);
};
using DatabasePtr = std::shared_ptr<IDatabase>;

View File

@ -39,7 +39,7 @@ DatabaseMaterializedMySQL::DatabaseMaterializedMySQL(
void DatabaseMaterializedMySQL::rethrowExceptionIfNeeded() const
{
std::unique_lock<std::mutex> lock(mutex);
std::lock_guard lock(mutex);
if (!settings->allows_query_when_mysql_lost && exception)
{
@ -59,7 +59,7 @@ void DatabaseMaterializedMySQL::rethrowExceptionIfNeeded() const
void DatabaseMaterializedMySQL::setException(const std::exception_ptr & exception_)
{
std::unique_lock<std::mutex> lock(mutex);
std::lock_guard lock(mutex);
exception = exception_;
}
@ -80,10 +80,10 @@ void DatabaseMaterializedMySQL::createTable(ContextPtr context_, const String &
DatabaseAtomic::createTable(context_, name, table, query);
}
void DatabaseMaterializedMySQL::dropTable(ContextPtr context_, const String & name, bool no_delay)
void DatabaseMaterializedMySQL::dropTable(ContextPtr context_, const String & name, bool sync)
{
checkIsInternalQuery(context_, "DROP TABLE");
DatabaseAtomic::dropTable(context_, name, no_delay);
DatabaseAtomic::dropTable(context_, name, sync);
}
void DatabaseMaterializedMySQL::attachTable(ContextPtr context_, const String & name, const StoragePtr & table, const String & relative_table_path)

View File

@ -52,7 +52,7 @@ public:
void createTable(ContextPtr context_, const String & name, const StoragePtr & table, const ASTPtr & query) override;
void dropTable(ContextPtr context_, const String & name, bool no_delay) override;
void dropTable(ContextPtr context_, const String & name, bool sync) override;
void attachTable(ContextPtr context_, const String & name, const StoragePtr & table, const String & relative_table_path) override;

View File

@ -447,7 +447,7 @@ void DatabaseMySQL::detachTablePermanently(ContextPtr, const String & table_name
table_iter->second.second->is_dropped = true;
}
void DatabaseMySQL::dropTable(ContextPtr local_context, const String & table_name, bool /*no_delay*/)
void DatabaseMySQL::dropTable(ContextPtr local_context, const String & table_name, bool /*sync*/)
{
detachTablePermanently(local_context, table_name);
}

View File

@ -82,7 +82,7 @@ public:
void detachTablePermanently(ContextPtr context, const String & table_name) override;
void dropTable(ContextPtr context, const String & table_name, bool no_delay) override;
void dropTable(ContextPtr context, const String & table_name, bool sync) override;
void attachTable(ContextPtr context, const String & table_name, const StoragePtr & storage, const String & relative_table_path) override;
@ -109,15 +109,15 @@ private:
void cleanOutdatedTables();
void fetchTablesIntoLocalCache(ContextPtr context) const;
void fetchTablesIntoLocalCache(ContextPtr context) const TSA_REQUIRES(mutex);
std::map<String, UInt64> fetchTablesWithModificationTime(ContextPtr local_context) const;
std::map<String, ColumnsDescription> fetchTablesColumnsList(const std::vector<String> & tables_name, ContextPtr context) const;
void destroyLocalCacheExtraTables(const std::map<String, UInt64> & tables_with_modification_time) const;
void destroyLocalCacheExtraTables(const std::map<String, UInt64> & tables_with_modification_time) const TSA_REQUIRES(mutex);
void fetchLatestTablesStructureIntoCache(const std::map<String, UInt64> & tables_modification_time, ContextPtr context) const;
void fetchLatestTablesStructureIntoCache(const std::map<String, UInt64> & tables_modification_time, ContextPtr context) const TSA_REQUIRES(mutex);
ThreadFromGlobalPool thread;
};

View File

@ -63,9 +63,9 @@ void DatabaseMaterializedPostgreSQL::startSynchronization()
return;
replication_handler = std::make_unique<PostgreSQLReplicationHandler>(
/* replication_identifier */database_name,
/* replication_identifier */ TSA_SUPPRESS_WARNING_FOR_READ(database_name), /// FIXME
remote_database_name,
database_name,
TSA_SUPPRESS_WARNING_FOR_READ(database_name), /// FIXME
connection_info,
getContext(),
is_attach,
@ -99,7 +99,8 @@ void DatabaseMaterializedPostgreSQL::startSynchronization()
else
{
/// Nested table does not exist and will be created by replication thread.
storage = std::make_shared<StorageMaterializedPostgreSQL>(StorageID(database_name, table_name), getContext(), remote_database_name, table_name);
/// FIXME TSA
storage = std::make_shared<StorageMaterializedPostgreSQL>(StorageID(TSA_SUPPRESS_WARNING_FOR_READ(database_name), table_name), getContext(), remote_database_name, table_name);
}
/// Cache MaterializedPostgreSQL wrapper over nested table.
@ -210,7 +211,8 @@ ASTPtr DatabaseMaterializedPostgreSQL::getCreateTableQueryImpl(const String & ta
std::lock_guard lock(handler_mutex);
auto storage = std::make_shared<StorageMaterializedPostgreSQL>(StorageID(database_name, table_name), getContext(), remote_database_name, table_name);
/// FIXME TSA
auto storage = std::make_shared<StorageMaterializedPostgreSQL>(StorageID(TSA_SUPPRESS_WARNING_FOR_READ(database_name), table_name), getContext(), remote_database_name, table_name);
auto ast_storage = replication_handler->getCreateNestedTableQuery(storage.get(), table_name);
assert_cast<ASTCreateQuery *>(ast_storage.get())->uuid = UUIDHelpers::generateV4();
return ast_storage;
@ -234,7 +236,7 @@ ASTPtr DatabaseMaterializedPostgreSQL::createAlterSettingsQuery(const SettingCha
auto * alter = query->as<ASTAlterQuery>();
alter->alter_object = ASTAlterQuery::AlterObjectType::DATABASE;
alter->setDatabase(database_name);
alter->setDatabase(TSA_SUPPRESS_WARNING_FOR_READ(database_name)); /// FIXME
alter->set(alter->command_list, command_list);
return query;
@ -390,10 +392,10 @@ void DatabaseMaterializedPostgreSQL::stopReplication()
}
void DatabaseMaterializedPostgreSQL::dropTable(ContextPtr local_context, const String & table_name, bool no_delay)
void DatabaseMaterializedPostgreSQL::dropTable(ContextPtr local_context, const String & table_name, bool sync)
{
/// Modify context into nested_context and pass query to Atomic database.
DatabaseAtomic::dropTable(StorageMaterializedPostgreSQL::makeNestedTableContext(local_context), table_name, no_delay);
DatabaseAtomic::dropTable(StorageMaterializedPostgreSQL::makeNestedTableContext(local_context), table_name, sync);
}

View File

@ -55,7 +55,7 @@ public:
StoragePtr detachTable(ContextPtr context, const String & table_name) override;
void dropTable(ContextPtr local_context, const String & name, bool no_delay) override;
void dropTable(ContextPtr local_context, const String & name, bool sync) override;
void drop(ContextPtr local_context) override;

View File

@ -264,7 +264,7 @@ void DatabasePostgreSQL::createTable(ContextPtr local_context, const String & ta
}
void DatabasePostgreSQL::dropTable(ContextPtr, const String & table_name, bool /* no_delay */)
void DatabasePostgreSQL::dropTable(ContextPtr, const String & table_name, bool /* sync */)
{
std::lock_guard<std::mutex> lock{mutex};
@ -369,7 +369,11 @@ ASTPtr DatabasePostgreSQL::getCreateDatabaseQuery() const
ASTPtr DatabasePostgreSQL::getCreateTableQueryImpl(const String & table_name, ContextPtr local_context, bool throw_on_error) const
{
auto storage = fetchTable(table_name, local_context, false);
StoragePtr storage;
{
std::lock_guard lock{mutex};
storage = fetchTable(table_name, local_context, false);
}
if (!storage)
{
if (throw_on_error)

View File

@ -53,7 +53,7 @@ public:
StoragePtr tryGetTable(const String & name, ContextPtr context) const override;
void createTable(ContextPtr, const String & table_name, const StoragePtr & storage, const ASTPtr & create_query) override;
void dropTable(ContextPtr, const String & table_name, bool no_delay) override;
void dropTable(ContextPtr, const String & table_name, bool sync) override;
void attachTable(ContextPtr context, const String & table_name, const StoragePtr & storage, const String & relative_table_path) override;
StoragePtr detachTable(ContextPtr context, const String & table_name) override;
@ -81,7 +81,7 @@ private:
bool checkPostgresTable(const String & table_name) const;
StoragePtr fetchTable(const String & table_name, ContextPtr context, bool table_checked) const;
StoragePtr fetchTable(const String & table_name, ContextPtr context, bool table_checked) const TSA_REQUIRES(mutex);
void removeOutdatedTables();

View File

@ -173,12 +173,16 @@ ASTPtr DatabaseSQLite::getCreateDatabaseQuery() const
ASTPtr DatabaseSQLite::getCreateTableQueryImpl(const String & table_name, ContextPtr local_context, bool throw_on_error) const
{
auto storage = fetchTable(table_name, local_context, false);
StoragePtr storage;
{
std::lock_guard<std::mutex> lock(mutex);
storage = fetchTable(table_name, local_context, false);
}
if (!storage)
{
if (throw_on_error)
throw Exception(ErrorCodes::UNKNOWN_TABLE, "SQLite table {}.{} does not exist",
database_name, table_name);
getDatabaseName(), table_name);
return nullptr;
}
auto table_storage_define = database_engine_define->clone();

View File

@ -54,9 +54,9 @@ private:
bool checkSQLiteTable(const String & table_name) const;
NameSet fetchTablesList() const;
NameSet fetchTablesList() const TSA_REQUIRES(mutex);
StoragePtr fetchTable(const String & table_name, ContextPtr context, bool table_checked) const;
StoragePtr fetchTable(const String & table_name, ContextPtr context, bool table_checked) const TSA_REQUIRES(mutex);
};

View File

@ -12,7 +12,7 @@
namespace Poco
{
class Logger;
class Logger; // NOLINT(cppcoreguidelines-virtual-class-destructor)
}
class AtomicStopwatch;

View File

@ -135,14 +135,14 @@ void CacheDictionaryUpdateQueue<dictionary_key_type>::updateThreadFunction()
/// Notify thread about finished updating the bunch of ids
/// where their own ids were included.
std::unique_lock<std::mutex> lock(update_mutex);
std::lock_guard lock(update_mutex);
unit_to_update->is_done = true;
is_update_finished.notify_all();
}
catch (...)
{
std::unique_lock<std::mutex> lock(update_mutex);
std::lock_guard lock(update_mutex);
unit_to_update->current_exception = std::current_exception(); // NOLINT(bugprone-throw-keyword-missing)
is_update_finished.notify_all();

View File

@ -90,7 +90,7 @@ DiskCacheWrapper::DiskCacheWrapper(
std::shared_ptr<FileDownloadMetadata> DiskCacheWrapper::acquireDownloadMetadata(const String & path) const
{
std::unique_lock<std::mutex> lock{mutex};
std::lock_guard lock{mutex};
auto it = file_downloads.find(path);
if (it != file_downloads.end())
@ -101,7 +101,7 @@ std::shared_ptr<FileDownloadMetadata> DiskCacheWrapper::acquireDownloadMetadata(
new FileDownloadMetadata,
[this, path] (FileDownloadMetadata * p)
{
std::unique_lock<std::mutex> erase_lock{mutex};
std::lock_guard erase_lock{mutex};
file_downloads.erase(path);
delete p;
});

View File

@ -56,7 +56,7 @@ void MetadataStorageFromDiskTransaction::commit()
toString(state), toString(MetadataFromDiskTransactionState::PREPARING));
{
std::unique_lock lock(metadata_storage.metadata_mutex);
std::lock_guard lock(metadata_storage.metadata_mutex);
for (size_t i = 0; i < operations.size(); ++i)
{
try

View File

@ -28,7 +28,7 @@ ClientConfigurationPerRequest ProxyResolverConfiguration::getConfiguration(const
{
LOG_DEBUG(&Poco::Logger::get("AWSClient"), "Obtain proxy using resolver: {}", endpoint.toString());
std::unique_lock lock(cache_mutex);
std::lock_guard lock(cache_mutex);
std::chrono::time_point<std::chrono::system_clock> now = std::chrono::system_clock::now();
@ -110,7 +110,7 @@ void ProxyResolverConfiguration::errorReport(const ClientConfigurationPerRequest
if (config.proxy_host.empty())
return;
std::unique_lock lock(cache_mutex);
std::lock_guard lock(cache_mutex);
if (!cache_ttl.count() || !cache_valid)
return;

View File

@ -36,7 +36,7 @@ struct FormatSettings
bool seekable_read = true;
UInt64 max_rows_to_read_for_schema_inference = 100;
String column_names_for_schema_inference = "";
String column_names_for_schema_inference;
enum class DateTimeInputFormat
{

View File

@ -181,7 +181,7 @@ public:
}
private:
void nextImpl() override final
void nextImpl() final
{
const size_t prev_size = Base::position() - memory.data();
memory.resize(2 * prev_size + 1);

View File

@ -212,7 +212,7 @@ namespace
size_t max_connections_per_endpoint,
bool resolve_host = true)
{
std::unique_lock lock(mutex);
std::lock_guard lock(mutex);
const std::string & host = uri.getHost();
UInt16 port = uri.getPort();
bool https = isHTTPS(uri);

View File

@ -148,7 +148,7 @@ public:
{
String credentials_string;
{
std::unique_lock<std::recursive_mutex> locker(token_mutex);
std::lock_guard locker(token_mutex);
LOG_TRACE(logger, "Getting default credentials for EC2 instance.");
auto result = GetResourceWithAWSWebServiceResult(endpoint.c_str(), EC2_SECURITY_CREDENTIALS_RESOURCE, nullptr);
@ -194,7 +194,7 @@ public:
String new_token;
{
std::unique_lock<std::recursive_mutex> locker(token_mutex);
std::lock_guard locker(token_mutex);
Aws::StringStream ss;
ss << endpoint << EC2_IMDS_TOKEN_RESOURCE;

View File

@ -1,11 +1,10 @@
#pragma once
#include <algorithm>
#include <cstring>
#include <memory>
#include <iostream>
#include <cassert>
#include <string.h>
#include <cstring>
#include <Common/Exception.h>
#include <Common/LockMemoryExceptionInThread.h>

View File

@ -64,7 +64,7 @@ public:
}
private:
void finalizeImpl() override final
void finalizeImpl() override
{
vector.resize(
((position() - reinterpret_cast<Position>(vector.data())) /// NOLINT

View File

@ -215,7 +215,7 @@ void AsynchronousInsertQueue::push(ASTPtr query, ContextPtr query_context)
}
}
std::unique_lock write_lock(rwlock);
std::lock_guard write_lock(rwlock);
auto it = queue.emplace(key, std::make_shared<Container>()).first;
pushImpl(std::move(entry), it);
}
@ -343,7 +343,7 @@ void AsynchronousInsertQueue::cleanup()
if (!keys_to_remove.empty())
{
std::unique_lock write_lock(rwlock);
std::lock_guard write_lock(rwlock);
size_t total_removed = 0;
for (const auto & key : keys_to_remove)

View File

@ -1,7 +1,6 @@
#pragma once
#include <Parsers/IAST_fwd.h>
#include <Common/RWLock.h>
#include <Common/ThreadPool.h>
#include <Core/Settings.h>
#include <Poco/Logger.h>

View File

@ -124,7 +124,7 @@ protected:
std::string queue_dir; /// dir with queue of queries
mutable std::mutex zookeeper_mutex;
ZooKeeperPtr current_zookeeper;
ZooKeeperPtr current_zookeeper TSA_GUARDED_BY(zookeeper_mutex);
/// Save state of executed task to avoid duplicate execution on ZK error
std::optional<String> last_skipped_entry_name;

View File

@ -205,7 +205,10 @@ void DatabaseCatalog::shutdownImpl()
for (auto & database : current_databases)
database.second->shutdown();
tables_marked_dropped.clear();
{
std::lock_guard lock(tables_marked_dropped_mutex);
tables_marked_dropped.clear();
}
std::lock_guard lock(databases_mutex);
for (const auto & db : databases)
@ -223,6 +226,7 @@ void DatabaseCatalog::shutdownImpl()
auto & table = mapping.second.second;
return db || table;
};
std::lock_guard map_lock{elem.mutex};
auto it = std::find_if(elem.map.begin(), elem.map.end(), not_empty_mapping);
return it != elem.map.end();
}) == uuid_map.end());
@ -689,7 +693,8 @@ DatabaseCatalog::updateDependency(const StorageID & old_from, const StorageID &
DDLGuardPtr DatabaseCatalog::getDDLGuard(const String & database, const String & table)
{
std::unique_lock lock(ddl_guards_mutex);
auto db_guard_iter = ddl_guards.try_emplace(database).first;
/// TSA does not support unique_lock
auto db_guard_iter = TSA_SUPPRESS_WARNING_FOR_WRITE(ddl_guards).try_emplace(database).first;
DatabaseGuard & db_guard = db_guard_iter->second;
return std::make_unique<DDLGuard>(db_guard.first, db_guard.second, std::move(lock), table, database);
}
@ -698,7 +703,7 @@ std::unique_lock<std::shared_mutex> DatabaseCatalog::getExclusiveDDLGuardForData
{
DDLGuards::iterator db_guard_iter;
{
std::unique_lock lock(ddl_guards_mutex);
std::lock_guard lock(ddl_guards_mutex);
db_guard_iter = ddl_guards.try_emplace(database).first;
assert(db_guard_iter->second.first.contains(""));
}
@ -999,7 +1004,7 @@ void DatabaseCatalog::waitTableFinallyDropped(const UUID & uuid)
LOG_DEBUG(log, "Waiting for table {} to be finally dropped", toString(uuid));
std::unique_lock lock{tables_marked_dropped_mutex};
wait_table_finally_dropped.wait(lock, [&]()
wait_table_finally_dropped.wait(lock, [&]() TSA_REQUIRES(tables_marked_dropped_mutex) -> bool
{
return !tables_marked_dropped_ids.contains(uuid);
});

View File

@ -221,7 +221,7 @@ public:
DependenciesInfo getLoadingDependenciesInfo(const StorageID & table_id) const;
TableNamesSet tryRemoveLoadingDependencies(const StorageID & table_id, bool check_dependencies, bool is_drop_database = false);
TableNamesSet tryRemoveLoadingDependenciesUnlocked(const QualifiedTableName & removing_table, bool check_dependencies, bool is_drop_database = false);
TableNamesSet tryRemoveLoadingDependenciesUnlocked(const QualifiedTableName & removing_table, bool check_dependencies, bool is_drop_database = false) TSA_REQUIRES(databases_mutex);
void checkTableCanBeRemovedOrRenamed(const StorageID & table_id) const;
void updateLoadingDependencies(const StorageID & table_id, TableNamesSet && new_dependencies);
@ -233,15 +233,15 @@ private:
static std::unique_ptr<DatabaseCatalog> database_catalog;
explicit DatabaseCatalog(ContextMutablePtr global_context_);
void assertDatabaseExistsUnlocked(const String & database_name) const;
void assertDatabaseDoesntExistUnlocked(const String & database_name) const;
void assertDatabaseExistsUnlocked(const String & database_name) const TSA_REQUIRES(databases_mutex);
void assertDatabaseDoesntExistUnlocked(const String & database_name) const TSA_REQUIRES(databases_mutex);
void shutdownImpl();
struct UUIDToStorageMapPart
{
std::unordered_map<UUID, DatabaseAndTable> map;
std::unordered_map<UUID, DatabaseAndTable> map TSA_GUARDED_BY(mutex);
mutable std::mutex mutex;
};
@ -273,12 +273,12 @@ private:
mutable std::mutex databases_mutex;
ViewDependencies view_dependencies;
ViewDependencies view_dependencies TSA_GUARDED_BY(databases_mutex);
Databases databases;
Databases databases TSA_GUARDED_BY(databases_mutex);
UUIDToStorageMap uuid_map;
DependenciesInfos loading_dependencies;
DependenciesInfos loading_dependencies TSA_GUARDED_BY(databases_mutex);
Poco::Logger * log;
@ -290,12 +290,12 @@ private:
/// In case the element already exists, waits when query will be executed in other thread. See class DDLGuard below.
using DatabaseGuard = std::pair<DDLGuard::Map, std::shared_mutex>;
using DDLGuards = std::map<String, DatabaseGuard>;
DDLGuards ddl_guards;
DDLGuards ddl_guards TSA_GUARDED_BY(ddl_guards_mutex);
/// If you capture mutex and ddl_guards_mutex, then you need to grab them strictly in this order.
mutable std::mutex ddl_guards_mutex;
TablesMarkedAsDropped tables_marked_dropped;
std::unordered_set<UUID> tables_marked_dropped_ids;
TablesMarkedAsDropped tables_marked_dropped TSA_GUARDED_BY(tables_marked_dropped_mutex);
std::unordered_set<UUID> tables_marked_dropped_ids TSA_GUARDED_BY(tables_marked_dropped_mutex);
mutable std::mutex tables_marked_dropped_mutex;
std::unique_ptr<BackgroundSchedulePoolTaskHolder> drop_task;

View File

@ -60,7 +60,7 @@ bool EmbeddedDictionaries::reloadDictionary(
bool EmbeddedDictionaries::reloadImpl(const bool throw_on_error, const bool force_reload)
{
std::unique_lock lock(mutex);
std::lock_guard lock(mutex);
/** If you can not update the directories, then despite this, do not throw an exception (use the old directories).
* If there are no old correct directories, then when using functions that depend on them,

View File

@ -16,7 +16,7 @@
#include <Common/ColumnsHashing.h>
#include <Common/HashTable/HashMap.h>
#include <Common/HashTable/FixedHashMap.h>
#include <Common/RWLock.h>
#include <Storages/TableLockHolder.h>
#include <Columns/ColumnString.h>
#include <Columns/ColumnFixedString.h>
@ -339,7 +339,7 @@ public:
/// We keep correspondence between used_flags and hash table internal buffer.
/// Hash table cannot be modified during HashJoin lifetime and must be protected with lock.
void setLock(RWLockImpl::LockHolder rwlock_holder)
void setLock(TableLockHolder rwlock_holder)
{
storage_join_lock = rwlock_holder;
}
@ -394,7 +394,7 @@ private:
/// Should be set via setLock to protect hash table from modification from StorageJoin
/// If set HashJoin instance is not available for modification (addJoinedBlock)
RWLockImpl::LockHolder storage_join_lock = nullptr;
TableLockHolder storage_join_lock = nullptr;
void dataMapInit(MapsVariant &);

View File

@ -0,0 +1,77 @@
#include <Interpreters/InterpreterCreateIndexQuery.h>
#include <Access/ContextAccess.h>
#include <Databases/DatabaseReplicated.h>
#include <Interpreters/Context.h>
#include <Interpreters/executeDDLQueryOnCluster.h>
#include <Parsers/ASTCreateIndexQuery.h>
#include <Parsers/ASTIdentifier.h>
#include <Parsers/ASTIndexDeclaration.h>
#include <Storages/AlterCommands.h>
namespace DB
{
namespace ErrorCodes
{
extern const int TABLE_IS_READ_ONLY;
}
BlockIO InterpreterCreateIndexQuery::execute()
{
const auto & create_index = query_ptr->as<ASTCreateIndexQuery &>();
AccessRightsElements required_access;
required_access.emplace_back(AccessType::ALTER_ADD_INDEX, create_index.getDatabase(), create_index.getTable());
if (!create_index.cluster.empty())
{
DDLQueryOnClusterParams params;
params.access_to_check = std::move(required_access);
return executeDDLQueryOnCluster(query_ptr, getContext(), params);
}
getContext()->checkAccess(required_access);
auto table_id = getContext()->resolveStorageID(create_index, Context::ResolveOrdinary);
query_ptr->as<ASTCreateIndexQuery &>().setDatabase(table_id.database_name);
DatabasePtr database = DatabaseCatalog::instance().getDatabase(table_id.database_name);
if (typeid_cast<DatabaseReplicated *>(database.get())
&& !getContext()->getClientInfo().is_replicated_database_internal)
{
auto guard = DatabaseCatalog::instance().getDDLGuard(table_id.database_name, table_id.table_name);
guard->releaseTableLock();
return typeid_cast<DatabaseReplicated *>(database.get())->tryEnqueueReplicatedDDL(query_ptr, getContext());
}
StoragePtr table = DatabaseCatalog::instance().getTable(table_id, getContext());
if (table->isStaticStorage())
throw Exception(ErrorCodes::TABLE_IS_READ_ONLY, "Table is read-only");
/// Convert ASTCreateIndexQuery to AlterCommand.
AlterCommands alter_commands;
AlterCommand command;
command.index_decl = create_index.index_decl;
command.type = AlterCommand::ADD_INDEX;
command.index_name = create_index.index_name->as<ASTIdentifier &>().name();
command.if_not_exists = create_index.if_not_exists;
/// Fill name in ASTIndexDeclaration
auto & ast_index_decl = command.index_decl->as<ASTIndexDeclaration &>();
ast_index_decl.name = command.index_name;
alter_commands.emplace_back(std::move(command));
auto alter_lock = table->lockForAlter(getContext()->getSettingsRef().lock_acquire_timeout);
StorageInMemoryMetadata metadata = table->getInMemoryMetadata();
alter_commands.validate(table, getContext());
alter_commands.prepare(metadata);
table->checkAlterIsPossible(alter_commands, getContext());
table->alter(alter_commands, getContext(), alter_lock);
return {};
}
}

View File

@ -0,0 +1,24 @@
#pragma once
#include <Interpreters/IInterpreter.h>
namespace DB
{
class Context;
class InterpreterCreateIndexQuery : public IInterpreter, WithContext
{
public:
InterpreterCreateIndexQuery(const ASTPtr & query_ptr_, ContextPtr context_)
: WithContext(context_)
, query_ptr(query_ptr_) {}
BlockIO execute() override;
private:
ASTPtr query_ptr;
};
}

View File

@ -150,10 +150,9 @@ BlockIO InterpreterCreateQuery::createDatabase(ASTCreateQuery & create)
/// When attaching old-style database during server startup, we must always use Ordinary engine
if (create.attach)
throw Exception("Database engine must be specified for ATTACH DATABASE query", ErrorCodes::UNKNOWN_DATABASE_ENGINE);
bool old_style_database = getContext()->getSettingsRef().default_database_engine.value == DefaultDatabaseEngine::Ordinary;
auto engine = std::make_shared<ASTFunction>();
auto storage = std::make_shared<ASTStorage>();
engine->name = old_style_database ? "Ordinary" : "Atomic";
engine->name = "Atomic";
engine->no_empty_args = true;
storage->set(storage->engine, engine);
create.set(create.storage, storage);
@ -196,8 +195,7 @@ BlockIO InterpreterCreateQuery::createDatabase(ASTCreateQuery & create)
if (create_from_user)
{
const auto & default_engine = getContext()->getSettingsRef().default_database_engine.value;
if (create.uuid == UUIDHelpers::Nil && default_engine == DefaultDatabaseEngine::Atomic)
if (create.uuid == UUIDHelpers::Nil)
create.uuid = UUIDHelpers::generateV4(); /// Will enable Atomic engine for nested database
}
else if (attach_from_user && create.uuid == UUIDHelpers::Nil)

View File

@ -0,0 +1,71 @@
#include <Access/ContextAccess.h>
#include <Databases/DatabaseReplicated.h>
#include <Interpreters/Context.h>
#include <Interpreters/executeDDLQueryOnCluster.h>
#include <Interpreters/InterpreterDropIndexQuery.h>
#include <Parsers/ASTDropIndexQuery.h>
#include <Parsers/ASTIdentifier.h>
#include <Storages/AlterCommands.h>
namespace DB
{
namespace ErrorCodes
{
extern const int TABLE_IS_READ_ONLY;
}
BlockIO InterpreterDropIndexQuery::execute()
{
const auto & drop_index = query_ptr->as<ASTDropIndexQuery &>();
AccessRightsElements required_access;
required_access.emplace_back(AccessType::ALTER_DROP_INDEX, drop_index.getDatabase(), drop_index.getTable());
if (!drop_index.cluster.empty())
{
DDLQueryOnClusterParams params;
params.access_to_check = std::move(required_access);
return executeDDLQueryOnCluster(query_ptr, getContext(), params);
}
getContext()->checkAccess(required_access);
auto table_id = getContext()->resolveStorageID(drop_index, Context::ResolveOrdinary);
query_ptr->as<ASTDropIndexQuery &>().setDatabase(table_id.database_name);
DatabasePtr database = DatabaseCatalog::instance().getDatabase(table_id.database_name);
if (typeid_cast<DatabaseReplicated *>(database.get())
&& !getContext()->getClientInfo().is_replicated_database_internal)
{
auto guard = DatabaseCatalog::instance().getDDLGuard(table_id.database_name, table_id.table_name);
guard->releaseTableLock();
return typeid_cast<DatabaseReplicated *>(database.get())->tryEnqueueReplicatedDDL(query_ptr, getContext());
}
StoragePtr table = DatabaseCatalog::instance().getTable(table_id, getContext());
if (table->isStaticStorage())
throw Exception(ErrorCodes::TABLE_IS_READ_ONLY, "Table is read-only");
/// Convert ASTDropIndexQuery to AlterCommand.
AlterCommands alter_commands;
AlterCommand command;
command.ast = drop_index.convertToASTAlterCommand();
command.type = AlterCommand::DROP_INDEX;
command.index_name = drop_index.index_name->as<ASTIdentifier &>().name();
command.if_exists = drop_index.if_exists;
alter_commands.emplace_back(std::move(command));
auto alter_lock = table->lockForAlter(getContext()->getSettingsRef().lock_acquire_timeout);
StorageInMemoryMetadata metadata = table->getInMemoryMetadata();
alter_commands.validate(table, getContext());
alter_commands.prepare(metadata);
table->checkAlterIsPossible(alter_commands, getContext());
table->alter(alter_commands, getContext(), alter_lock);
return {};
}
}

View File

@ -0,0 +1,24 @@
#pragma once
#include <Interpreters/IInterpreter.h>
namespace DB
{
class Context;
class InterpreterDropIndexQuery : public IInterpreter, WithContext
{
public:
InterpreterDropIndexQuery(const ASTPtr & query_ptr_, ContextPtr context_)
: WithContext(context_)
, query_ptr(query_ptr_) {}
BlockIO execute() override;
private:
ASTPtr query_ptr;
};
}

View File

@ -62,7 +62,7 @@ BlockIO InterpreterDropQuery::execute()
}
if (getContext()->getSettingsRef().database_atomic_wait_for_drop_and_detach_synchronously)
drop.no_delay = true;
drop.sync = true;
if (drop.table)
return executeToTable(drop);
@ -89,7 +89,7 @@ BlockIO InterpreterDropQuery::executeToTable(ASTDropQuery & query)
DatabasePtr database;
UUID table_to_wait_on = UUIDHelpers::Nil;
auto res = executeToTableImpl(getContext(), query, database, table_to_wait_on);
if (query.no_delay)
if (query.sync)
waitForTableToBeActuallyDroppedOrDetached(query, database, table_to_wait_on);
return res;
}
@ -244,7 +244,7 @@ BlockIO InterpreterDropQuery::executeToTableImpl(ContextPtr context_, ASTDropQue
DatabaseCatalog::instance().tryRemoveLoadingDependencies(table_id, getContext()->getSettingsRef().check_table_dependencies,
is_drop_or_detach_database);
database->dropTable(context_, table_id.table_name, query.no_delay);
database->dropTable(context_, table_id.table_name, query.sync);
}
db = database;
@ -300,7 +300,7 @@ BlockIO InterpreterDropQuery::executeToDatabase(const ASTDropQuery & query)
}
catch (...)
{
if (query.no_delay)
if (query.sync)
{
for (const auto & table_uuid : tables_to_wait)
waitForTableToBeActuallyDroppedOrDetached(query, database, table_uuid);
@ -308,7 +308,7 @@ BlockIO InterpreterDropQuery::executeToDatabase(const ASTDropQuery & query)
throw;
}
if (query.no_delay)
if (query.sync)
{
for (const auto & table_uuid : tables_to_wait)
waitForTableToBeActuallyDroppedOrDetached(query, database, table_uuid);
@ -345,7 +345,7 @@ BlockIO InterpreterDropQuery::executeToDatabaseImpl(const ASTDropQuery & query,
query_for_table.kind = query.kind;
query_for_table.if_exists = true;
query_for_table.setDatabase(database_name);
query_for_table.no_delay = query.no_delay;
query_for_table.sync = query.sync;
/// Flush should not be done if shouldBeEmptyOnDetach() == false,
/// since in this case getTablesIterator() may do some additional work,
@ -368,7 +368,7 @@ BlockIO InterpreterDropQuery::executeToDatabaseImpl(const ASTDropQuery & query,
}
}
if (!drop && query.no_delay)
if (!drop && query.sync)
{
/// Avoid "some tables are still in use" when sync mode is enabled
for (const auto & table_uuid : uuids_to_wait)
@ -428,7 +428,7 @@ void InterpreterDropQuery::extendQueryLogElemImpl(QueryLogElement & elem, const
elem.query_kind = "Drop";
}
void InterpreterDropQuery::executeDropQuery(ASTDropQuery::Kind kind, ContextPtr global_context, ContextPtr current_context, const StorageID & target_table_id, bool no_delay)
void InterpreterDropQuery::executeDropQuery(ASTDropQuery::Kind kind, ContextPtr global_context, ContextPtr current_context, const StorageID & target_table_id, bool sync)
{
if (DatabaseCatalog::instance().tryGetTable(target_table_id, current_context))
{
@ -437,7 +437,7 @@ void InterpreterDropQuery::executeDropQuery(ASTDropQuery::Kind kind, ContextPtr
drop_query->setDatabase(target_table_id.database_name);
drop_query->setTable(target_table_id.table_name);
drop_query->kind = kind;
drop_query->no_delay = no_delay;
drop_query->sync = sync;
drop_query->if_exists = true;
ASTPtr ast_drop_query = drop_query;
/// FIXME We have to use global context to execute DROP query for inner table

View File

@ -26,7 +26,7 @@ public:
void extendQueryLogElemImpl(QueryLogElement & elem, const ASTPtr &, ContextPtr) const override;
static void executeDropQuery(ASTDropQuery::Kind kind, ContextPtr global_context, ContextPtr current_context, const StorageID & target_table_id, bool no_delay);
static void executeDropQuery(ASTDropQuery::Kind kind, ContextPtr global_context, ContextPtr current_context, const StorageID & target_table_id, bool sync);
private:
AccessRightsElements getRequiredAccessForDDLOnCluster() const;

View File

@ -3,7 +3,9 @@
#include <Parsers/ASTCheckQuery.h>
#include <Parsers/ASTCreateQuery.h>
#include <Parsers/ASTCreateFunctionQuery.h>
#include <Parsers/ASTCreateIndexQuery.h>
#include <Parsers/ASTDropFunctionQuery.h>
#include <Parsers/ASTDropIndexQuery.h>
#include <Parsers/ASTDropQuery.h>
#include <Parsers/ASTExplainQuery.h>
#include <Parsers/ASTInsertQuery.h>
@ -42,10 +44,12 @@
#include <Interpreters/InterpreterBackupQuery.h>
#include <Interpreters/InterpreterCheckQuery.h>
#include <Interpreters/InterpreterCreateFunctionQuery.h>
#include <Interpreters/InterpreterCreateIndexQuery.h>
#include <Interpreters/InterpreterCreateQuery.h>
#include <Interpreters/InterpreterDescribeQuery.h>
#include <Interpreters/InterpreterDescribeCacheQuery.h>
#include <Interpreters/InterpreterDropFunctionQuery.h>
#include <Interpreters/InterpreterDropIndexQuery.h>
#include <Interpreters/InterpreterDropQuery.h>
#include <Interpreters/InterpreterExistsQuery.h>
#include <Interpreters/InterpreterExplainQuery.h>
@ -298,6 +302,14 @@ std::unique_ptr<IInterpreter> InterpreterFactory::get(ASTPtr & query, ContextMut
{
return std::make_unique<InterpreterDropFunctionQuery>(query, context);
}
else if (query->as<ASTCreateIndexQuery>())
{
return std::make_unique<InterpreterCreateIndexQuery>(query, context);
}
else if (query->as<ASTDropIndexQuery>())
{
return std::make_unique<InterpreterDropIndexQuery>(query, context);
}
else if (query->as<ASTBackupQuery>())
{
return std::make_unique<InterpreterBackupQuery>(query, context);

Some files were not shown because too many files have changed in this diff Show More