Merge branch 'master' into keeper-dont-rollback-session-id

This commit is contained in:
mergify[bot] 2022-06-30 07:16:43 +00:00 committed by GitHub
commit 1795f863a5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
362 changed files with 2208 additions and 626 deletions

View File

@ -124,21 +124,37 @@
#endif
#endif
// Macros for Clang Thread Safety Analysis (TSA). They can be safely ignored by other compilers.
// Feel free to extend, but please stay close to https://clang.llvm.org/docs/ThreadSafetyAnalysis.html#mutexheader
/// Macros for Clang Thread Safety Analysis (TSA). They can be safely ignored by other compilers.
/// Feel free to extend, but please stay close to https://clang.llvm.org/docs/ThreadSafetyAnalysis.html#mutexheader
#if defined(__clang__)
# define TSA_GUARDED_BY(...) __attribute__((guarded_by(__VA_ARGS__))) // data is protected by given capability
# define TSA_PT_GUARDED_BY(...) __attribute__((pt_guarded_by(__VA_ARGS__))) // pointed-to data is protected by the given capability
# define TSA_REQUIRES(...) __attribute__((requires_capability(__VA_ARGS__))) // thread needs exclusive possession of given capability
# define TSA_REQUIRES_SHARED(...) __attribute__((requires_shared_capability(__VA_ARGS__))) // thread needs shared possession of given capability
# define TSA_ACQUIRED_AFTER(...) __attribute__((acquired_after(__VA_ARGS__))) // annotated lock must be locked after given lock
# define TSA_NO_THREAD_SAFETY_ANALYSIS __attribute__((no_thread_safety_analysis)) // disable TSA for a function
# define TSA_GUARDED_BY(...) __attribute__((guarded_by(__VA_ARGS__))) /// data is protected by given capability
# define TSA_PT_GUARDED_BY(...) __attribute__((pt_guarded_by(__VA_ARGS__))) /// pointed-to data is protected by the given capability
# define TSA_REQUIRES(...) __attribute__((requires_capability(__VA_ARGS__))) /// thread needs exclusive possession of given capability
# define TSA_REQUIRES_SHARED(...) __attribute__((requires_shared_capability(__VA_ARGS__))) /// thread needs shared possession of given capability
# define TSA_ACQUIRED_AFTER(...) __attribute__((acquired_after(__VA_ARGS__))) /// annotated lock must be locked after given lock
# define TSA_NO_THREAD_SAFETY_ANALYSIS __attribute__((no_thread_safety_analysis)) /// disable TSA for a function
/// Macros for suppressing TSA warnings for specific reads/writes (instead of suppressing it for the whole function)
/// Consider adding a comment before using these macros.
# define TSA_SUPPRESS_WARNING_FOR_READ(x) [&]() TSA_NO_THREAD_SAFETY_ANALYSIS -> const auto & { return (x); }()
# define TSA_SUPPRESS_WARNING_FOR_WRITE(x) [&]() TSA_NO_THREAD_SAFETY_ANALYSIS -> auto & { return (x); }()
/// This macro is useful when only one thread writes to a member
/// and you want to read this member from the same thread without locking a mutex.
/// It's safe (because no concurrent writes are possible), but TSA generates a warning.
/// (Seems like there's no way to verify it, but it makes sense to distinguish it from TSA_SUPPRESS_WARNING_FOR_READ for readability)
# define TSA_READ_ONE_THREAD(x) TSA_SUPPRESS_WARNING_FOR_READ(x)
#else
# define TSA_GUARDED_BY(...)
# define TSA_PT_GUARDED_BY(...)
# define TSA_REQUIRES(...)
# define TSA_REQUIRES_SHARED(...)
# define TSA_NO_THREAD_SAFETY_ANALYSIS
# define TSA_SUPPRESS_WARNING_FOR_READ(x)
# define TSA_SUPPRESS_WARNING_FOR_WRITE(x)
# define TSA_READ_ONE_THREAD(x)
#endif
/// A template function for suppressing warnings about unused variables or function results.

@ -1 +1 @@
Subproject commit 5f4034a3a6376416504f17186c55fe401c6d8e5e
Subproject commit e39608998f5f6944ece9ec61f48e9172ec1de660

2
contrib/poco vendored

@ -1 +1 @@
Subproject commit de35b9fd72b57127abdc3a5beaf0e320d767e356
Subproject commit 0e32cb42db76ddaa76848470219056908053b676

View File

@ -11,6 +11,7 @@ add_library (_poco_xml_expat ${SRCS_EXPAT})
add_library (Poco::XML::Expat ALIAS _poco_xml_expat)
target_include_directories (_poco_xml_expat PUBLIC "${LIBRARY_DIR}/XML/include")
target_include_directories (_poco_xml_expat PRIVATE "${LIBRARY_DIR}/Foundation/include")
# Poco::XML

View File

@ -73,6 +73,9 @@ function download
) &
wait
echo "ATTACH DATABASE default ENGINE=Ordinary" > db0/metadata/default.sql
echo "ATTACH DATABASE datasets ENGINE=Ordinary" > db0/metadata/datasets.sql
ls db0/metadata
}
download

View File

@ -120,6 +120,10 @@ function run_tests()
ADDITIONAL_OPTIONS+=('--replicated-database')
fi
if [[ -n "$USE_DATABASE_ORDINARY" ]] && [[ "$USE_DATABASE_ORDINARY" -eq 1 ]]; then
ADDITIONAL_OPTIONS+=('--db-engine=Ordinary')
fi
set +e
clickhouse-test -j 2 --testname --shard --zookeeper --check-zookeeper-session --no-stateless --hung-check --print-time \
--skip 00168_parallel_processing_on_replicas "${ADDITIONAL_OPTIONS[@]}" \

View File

@ -115,6 +115,10 @@ function run_tests()
ADDITIONAL_OPTIONS+=("$RUN_BY_HASH_TOTAL")
fi
if [[ -n "$USE_DATABASE_ORDINARY" ]] && [[ "$USE_DATABASE_ORDINARY" -eq 1 ]]; then
ADDITIONAL_OPTIONS+=('--db-engine=Ordinary')
fi
set +e
clickhouse-test --testname --shard --zookeeper --check-zookeeper-session --hung-check --print-time \
--test-runs "$NUM_TRIES" "${ADDITIONAL_OPTIONS[@]}" 2>&1 \

View File

@ -284,6 +284,11 @@ then
rm -rf /var/lib/clickhouse/*
# Make BC check more funny by forcing Ordinary engine for system database
# New version will try to convert it to Atomic on startup
mkdir /var/lib/clickhouse/metadata
echo "ATTACH DATABASE system ENGINE=Ordinary" > /var/lib/clickhouse/metadata/system.sql
# Install previous release packages
install_packages previous_release_package_folder

View File

@ -3,6 +3,74 @@ sidebar_label: Geo
sidebar_position: 62
---
# Geo Functions
# Geo Functions
## Geographical Coordinates Functions
- [greatCircleDistance](./coordinates.md#greatCircleDistance)
- [geoDistance](./coordinates.md#geoDistance)
- [greatCircleAngle](./coordinates.md#greatCircleAngle)
- [pointInEllipses](./coordinates.md#pointInEllipses)
- [pointInPolygon](./coordinates.md#pointInPolygon)
## Geohash Functions
- [geohashEncode](./geohash.md#geohashEncode)
- [geohashDecode](./geohash.md#geohashDecode)
- [geohashesInBox](./geohash.md#geohashesInBox)
## H3 Indexes Functions
- [h3IsValid](./h3.md#h3IsValid)
- [h3GetResolution](./h3.md#h3GetResolution)
- [h3EdgeAngle](./h3.md#h3EdgeAngle)
- [h3EdgeLengthM](./h3.md#h3EdgeLengthM)
- [h3EdgeLengthKm](./h3.md#h3EdgeLengthKm)
- [geoToH3](./h3.md#geoToH3)
- [h3ToGeo](./h3.md#h3ToGeo)
- [h3ToGeoBoundary](./h3.md#h3ToGeoBoundary)
- [h3kRing](./h3.md#h3kRing)
- [h3GetBaseCell](./h3.md#h3GetBaseCell)
- [h3HexAreaM2](./h3.md#h3HexAreaM2)
- [h3HexAreaKm2](./h3.md#h3HexAreaKm2)
- [h3IndexesAreNeighbors](./h3.md#h3IndexesAreNeighbors)
- [h3ToChildren](./h3.md#h3ToChildren)
- [h3ToParent](./h3.md#h3ToParent)
- [h3ToString](./h3.md#h3ToString)
- [stringToH3](./h3.md#stringToH3)
- [h3GetResolution](./h3.md#h3GetResolution)
- [h3IsResClassIII](./h3.md#h3IsResClassIII)
- [h3IsPentagon](./h3.md#h3IsPentagon)
- [h3GetFaces](./h3.md#h3GetFaces)
- [h3CellAreaM2](./h3.md#h3CellAreaM2)
- [h3CellAreaRads2](./h3.md#h3CellAreaRads2)
- [h3ToCenterChild](./h3.md#h3ToCenterChild)
- [h3ExactEdgeLengthM](./h3.md#h3ExactEdgeLengthM)
- [h3ExactEdgeLengthKm](./h3.md#h3ExactEdgeLengthKm)
- [h3ExactEdgeLengthRads](./h3.md#h3ExactEdgeLengthRads)
- [h3NumHexagons](./h3.md#h3NumHexagons)
- [h3Line](./h3.md#h3Line)
- [h3Distance](./h3.md#h3Distance)
- [h3HexRing](./h3.md#h3HexRing)
- [h3GetUnidirectionalEdge](./h3.md#h3GetUnidirectionalEdge)
- [h3UnidirectionalEdgeIsValid](./h3.md#h3UnidirectionalEdgeIsValid)
- [h3GetOriginIndexFromUnidirectionalEdge](./h3.md#h3GetOriginIndexFromUnidirectionalEdge)
- [h3GetDestinationIndexFromUnidirectionalEdge](./h3.md#h3GetDestinationIndexFromUnidirectionalEdge)
- [h3GetIndexesFromUnidirectionalEdge](./h3.md#h3GetIndexesFromUnidirectionalEdge)
- [h3GetUnidirectionalEdgesFromHexagon](./h3.md#h3GetUnidirectionalEdgesFromHexagon)
- [h3GetUnidirectionalEdgeBoundary](./h3.md#h3GetUnidirectionalEdgeBoundary)
## S2 Index Functions
- [geoToS2](./s2.md#geoToS2)
- [s2ToGeo](./s2.md#s2ToGeo)
- [s2GetNeighbors](./s2.md#s2GetNeighbors)
- [s2CellsIntersect](./s2.md#s2CellsIntersect)
- [s2CapContains](./s2.md#s2CapContains)
- [s2CapUnion](./s2.md#s2CapUnion)
- [s2RectAdd](./s2.md#s2RectAdd)
- [s2RectContains](./s2.md#s2RectContains)
- [s2RectUinion](./s2.md#s2RectUinion)
- [s2RectIntersection](./s2.md#s2RectIntersection)
[Original article](https://clickhouse.com/docs/en/sql-reference/functions/geo/) <!--hide-->

View File

@ -16,7 +16,14 @@ sidebar_label: 使用教程
例如,您选择`deb`安装包,执行:
``` bash
{% include 'install/deb.sh' %}
sudo apt-get install -y apt-transport-https ca-certificates dirmngr
sudo apt-key adv --keyserver hkp://keyserver.ubuntu.com:80 --recv 8919F6BD2B48D754
echo "deb https://packages.clickhouse.com/deb stable main" | sudo tee \
/etc/apt/sources.list.d/clickhouse.list
sudo apt-get update
sudo apt-get install -y clickhouse-server clickhouse-client
```
在我们安装的软件中包含这些包:

View File

@ -1488,6 +1488,7 @@ int Server::main(const std::vector<std::string> & /*args*/)
/// We load temporary database first, because projections need it.
database_catalog.initializeAndLoadTemporaryDatabase();
loadMetadataSystem(global_context);
maybeConvertOrdinaryDatabaseToAtomic(global_context, DatabaseCatalog::instance().getSystemDatabase());
/// After attaching system databases we can initialize system log.
global_context->initializeSystemLogs();
global_context->setSystemZooKeeperLogAfterInitializationIfNeeded();

View File

@ -445,7 +445,6 @@ static constexpr UInt64 operator""_GiB(unsigned long long value)
M(Seconds, wait_for_window_view_fire_signal_timeout, 10, "Timeout for waiting for window view fire signal in event time processing", 0) \
M(UInt64, min_free_disk_space_for_temporary_data, 0, "The minimum disk space to keep while writing temporary data used in external sorting and aggregation.", 0) \
\
M(DefaultDatabaseEngine, default_database_engine, DefaultDatabaseEngine::Atomic, "Default database engine.", 0) \
M(DefaultTableEngine, default_table_engine, DefaultTableEngine::None, "Default table engine used when ENGINE is not set in CREATE statement.",0) \
M(Bool, show_table_uuid_in_table_create_query_if_not_nil, false, "For tables in databases with Engine=Atomic show UUID of the table in its CREATE query.", 0) \
M(Bool, database_atomic_wait_for_drop_and_detach_synchronously, false, "When executing DROP or DETACH TABLE in Atomic database, wait for table data to be finally dropped or detached.", 0) \
@ -640,6 +639,7 @@ static constexpr UInt64 operator""_GiB(unsigned long long value)
MAKE_OBSOLETE(M, UInt64, background_schedule_pool_size, 128) \
MAKE_OBSOLETE(M, UInt64, background_message_broker_schedule_pool_size, 16) \
MAKE_OBSOLETE(M, UInt64, background_distributed_schedule_pool_size, 16) \
MAKE_OBSOLETE(M, DefaultDatabaseEngine, default_database_engine, DefaultDatabaseEngine::Atomic) \
/** The section above is for obsolete settings. Do not add anything there. */

View File

@ -1,8 +1,10 @@
#include <cstring>
#include <memory>
#include <Common/typeid_cast.h>
#include <Common/assert_cast.h>
#include <Common/StringUtils/StringUtils.h>
#include "Columns/IColumn.h"
#include <DataTypes/DataTypeArray.h>
#include <DataTypes/DataTypeTuple.h>
@ -17,7 +19,6 @@
#include <boost/algorithm/string/case_conv.hpp>
namespace DB
{
@ -76,8 +77,7 @@ Block flatten(const Block & block)
for (const auto & elem : block)
{
const DataTypeArray * type_arr = typeid_cast<const DataTypeArray *>(elem.type.get());
if (type_arr)
if (const DataTypeArray * type_arr = typeid_cast<const DataTypeArray *>(elem.type.get()))
{
const DataTypeTuple * type_tuple = typeid_cast<const DataTypeTuple *>(type_arr->getNestedType().get());
if (type_tuple && type_tuple->haveExplicitNames())
@ -106,7 +106,7 @@ Block flatten(const Block & block)
res.insert(ColumnWithTypeAndName(
is_const
? ColumnConst::create(std::move(column_array_of_element), block.rows())
: std::move(column_array_of_element),
: column_array_of_element,
std::make_shared<DataTypeArray>(element_types[i]),
nested_name));
}
@ -114,6 +114,28 @@ Block flatten(const Block & block)
else
res.insert(elem);
}
else if (const DataTypeTuple * type_tuple = typeid_cast<const DataTypeTuple *>(elem.type.get()))
{
if (type_tuple->haveExplicitNames())
{
const DataTypes & element_types = type_tuple->getElements();
const Strings & names = type_tuple->getElementNames();
const ColumnTuple * column_tuple;
if (isColumnConst(*elem.column))
column_tuple = typeid_cast<const ColumnTuple *>(&assert_cast<const ColumnConst &>(*elem.column).getDataColumn());
else
column_tuple = typeid_cast<const ColumnTuple *>(elem.column.get());
size_t tuple_size = column_tuple->tupleSize();
for (size_t i = 0; i < tuple_size; ++i)
{
const auto & element_column = column_tuple->getColumn(i);
String nested_name = concatenateName(elem.name, names[i]);
res.insert(ColumnWithTypeAndName(element_column.getPtr(), element_types[i], nested_name));
}
}
else
res.insert(elem);
}
else
res.insert(elem);
}
@ -243,7 +265,71 @@ std::unordered_set<String> getAllTableNames(const Block & block, bool to_lower_c
}
return nested_table_names;
}
}
NestedColumnExtractHelper::NestedColumnExtractHelper(const Block & block_, bool case_insentive_)
: block(block_)
, case_insentive(case_insentive_)
{}
std::optional<ColumnWithTypeAndName> NestedColumnExtractHelper::extractColumn(const String & column_name)
{
if (block.has(column_name, case_insentive))
return {block.getByName(column_name, case_insentive)};
auto nested_names = Nested::splitName(column_name);
if (case_insentive)
{
boost::to_lower(nested_names.first);
boost::to_lower(nested_names.second);
}
if (!block.has(nested_names.first, case_insentive))
return {};
if (!nested_tables.contains(nested_names.first))
{
ColumnsWithTypeAndName columns = {block.getByName(nested_names.first, case_insentive)};
nested_tables[nested_names.first] = std::make_shared<Block>(Nested::flatten(columns));
}
return extractColumn(column_name, nested_names.first, nested_names.second);
}
std::optional<ColumnWithTypeAndName> NestedColumnExtractHelper::extractColumn(
const String & original_column_name, const String & column_name_prefix, const String & column_name_suffix)
{
auto table_iter = nested_tables.find(column_name_prefix);
if (table_iter == nested_tables.end())
{
return {};
}
auto & nested_table = table_iter->second;
auto nested_names = Nested::splitName(column_name_suffix);
auto new_column_name_prefix = Nested::concatenateName(column_name_prefix, nested_names.first);
if (nested_names.second.empty())
{
if (auto * column_ref = nested_table->findByName(new_column_name_prefix, case_insentive))
{
ColumnWithTypeAndName column = *column_ref;
if (case_insentive)
column.name = original_column_name;
return {std::move(column)};
}
else
{
return {};
}
}
if (!nested_table->has(new_column_name_prefix, case_insentive))
{
return {};
}
ColumnsWithTypeAndName columns = {nested_table->getByName(new_column_name_prefix, case_insentive)};
Block sub_block(columns);
nested_tables[new_column_name_prefix] = std::make_shared<Block>(Nested::flatten(sub_block));
return extractColumn(original_column_name, new_column_name_prefix, nested_names.second);
}
}

View File

@ -18,8 +18,9 @@ namespace Nested
/// Returns the prefix of the name to the first '.'. Or the name is unchanged if there is no dot.
std::string extractTableName(const std::string & nested_name);
/// Replace Array(Tuple(...)) columns to a multiple of Array columns in a form of `column_name.element_name`.
/// only for named tuples that actually represent Nested structures.
/// Flat a column of nested type into columns
/// 1) For named tuplest Tuple(x .., y ..., ...), replace it with t.x ..., t.y ... , ...
/// 2) For an Array with named Tuple element column, a Array(Tuple(x ..., y ..., ...)), replace it with multiple Array Columns, a.x ..., a.y ..., ...
Block flatten(const Block & block);
/// Collect Array columns in a form of `column_name.element_name` to single Array(Tuple(...)) column.
@ -35,4 +36,20 @@ namespace Nested
std::unordered_set<String> getAllTableNames(const Block & block, bool to_lower_case = false);
}
/// Use this class to extract element columns from columns of nested type in a block, e.g. named Tuple.
/// It can extract a column from a multiple nested type column, e.g. named Tuple in named Tuple
/// Keeps some intermediate data to avoid rebuild them multi-times.
class NestedColumnExtractHelper
{
public:
explicit NestedColumnExtractHelper(const Block & block_, bool case_insentive_);
std::optional<ColumnWithTypeAndName> extractColumn(const String & column_name);
private:
std::optional<ColumnWithTypeAndName>
extractColumn(const String & original_column_name, const String & column_name_prefix, const String & column_name_suffix);
const Block & block;
bool case_insentive;
std::map<String, BlockPtr> nested_tables;
};
}

View File

@ -73,7 +73,7 @@ String DatabaseAtomic::getTableDataPath(const ASTCreateQuery & query) const
void DatabaseAtomic::drop(ContextPtr)
{
assert(tables.empty());
assert(TSA_SUPPRESS_WARNING_FOR_READ(tables).empty());
try
{
fs::remove(path_to_metadata_symlink);
@ -90,40 +90,40 @@ void DatabaseAtomic::attachTable(ContextPtr /* context_ */, const String & name,
{
assert(relative_table_path != data_path && !relative_table_path.empty());
DetachedTables not_in_use;
std::unique_lock lock(mutex);
std::lock_guard lock(mutex);
not_in_use = cleanupDetachedTables();
auto table_id = table->getStorageID();
assertDetachedTableNotInUse(table_id.uuid);
DatabaseOrdinary::attachTableUnlocked(name, table, lock);
DatabaseOrdinary::attachTableUnlocked(name, table);
table_name_to_path.emplace(std::make_pair(name, relative_table_path));
}
StoragePtr DatabaseAtomic::detachTable(ContextPtr /* context */, const String & name)
{
DetachedTables not_in_use;
std::unique_lock lock(mutex);
auto table = DatabaseOrdinary::detachTableUnlocked(name, lock);
std::lock_guard lock(mutex);
auto table = DatabaseOrdinary::detachTableUnlocked(name);
table_name_to_path.erase(name);
detached_tables.emplace(table->getStorageID().uuid, table);
not_in_use = cleanupDetachedTables(); //-V1001
return table;
}
void DatabaseAtomic::dropTable(ContextPtr local_context, const String & table_name, bool no_delay)
void DatabaseAtomic::dropTable(ContextPtr local_context, const String & table_name, bool sync)
{
auto table = tryGetTable(table_name, local_context);
/// Remove the inner table (if any) to avoid deadlock
/// (due to attempt to execute DROP from the worker thread)
if (table)
table->dropInnerTableIfAny(no_delay, local_context);
table->dropInnerTableIfAny(sync, local_context);
else
throw Exception(ErrorCodes::UNKNOWN_TABLE, "Table {}.{} doesn't exist",
backQuote(database_name), backQuote(table_name));
backQuote(getDatabaseName()), backQuote(table_name));
String table_metadata_path = getObjectMetadataPath(table_name);
String table_metadata_path_drop;
{
std::unique_lock lock(mutex);
std::lock_guard lock(mutex);
table_metadata_path_drop = DatabaseCatalog::instance().getPathForDroppedMetadata(table->getStorageID());
auto txn = local_context->getZooKeeperMetadataTransaction();
if (txn && !local_context->isInternalSubquery())
@ -136,7 +136,7 @@ void DatabaseAtomic::dropTable(ContextPtr local_context, const String & table_na
/// TODO better detection and recovery
fs::rename(table_metadata_path, table_metadata_path_drop); /// Mark table as dropped
DatabaseOrdinary::detachTableUnlocked(table_name, lock); /// Should never throw
DatabaseOrdinary::detachTableUnlocked(table_name); /// Should never throw
table_name_to_path.erase(table_name);
}
@ -145,11 +145,12 @@ void DatabaseAtomic::dropTable(ContextPtr local_context, const String & table_na
/// Notify DatabaseCatalog that table was dropped. It will remove table data in background.
/// Cleanup is performed outside of database to allow easily DROP DATABASE without waiting for cleanup to complete.
DatabaseCatalog::instance().enqueueDroppedTableCleanup(table->getStorageID(), table, table_metadata_path_drop, no_delay);
DatabaseCatalog::instance().enqueueDroppedTableCleanup(table->getStorageID(), table, table_metadata_path_drop, sync);
}
void DatabaseAtomic::renameTable(ContextPtr local_context, const String & table_name, IDatabase & to_database,
const String & to_table_name, bool exchange, bool dictionary)
TSA_NO_THREAD_SAFETY_ANALYSIS /// TSA does not support conditional locking
{
if (typeid(*this) != typeid(to_database))
{
@ -173,7 +174,7 @@ void DatabaseAtomic::renameTable(ContextPtr local_context, const String & table_
String old_metadata_path = getObjectMetadataPath(table_name);
String new_metadata_path = to_database.getObjectMetadataPath(to_table_name);
auto detach = [](DatabaseAtomic & db, const String & table_name_, bool has_symlink)
auto detach = [](DatabaseAtomic & db, const String & table_name_, bool has_symlink) TSA_REQUIRES(db.mutex)
{
auto it = db.table_name_to_path.find(table_name_);
String table_data_path_saved;
@ -188,7 +189,7 @@ void DatabaseAtomic::renameTable(ContextPtr local_context, const String & table_
return table_data_path_saved;
};
auto attach = [](DatabaseAtomic & db, const String & table_name_, const String & table_data_path_, const StoragePtr & table_)
auto attach = [](DatabaseAtomic & db, const String & table_name_, const String & table_data_path_, const StoragePtr & table_) TSA_REQUIRES(db.mutex)
{
db.tables.emplace(table_name_, table_);
if (table_data_path_.empty())
@ -229,9 +230,9 @@ void DatabaseAtomic::renameTable(ContextPtr local_context, const String & table_
}
if (!exchange)
other_db.checkMetadataFilenameAvailabilityUnlocked(to_table_name, inside_database ? db_lock : other_db_lock);
other_db.checkMetadataFilenameAvailabilityUnlocked(to_table_name);
StoragePtr table = getTableUnlocked(table_name, db_lock);
StoragePtr table = getTableUnlocked(table_name);
if (dictionary && !table->isDictionary())
throw Exception(ErrorCodes::INCORRECT_QUERY, "Use RENAME/EXCHANGE TABLE (instead of RENAME/EXCHANGE DICTIONARY) for tables");
@ -244,7 +245,7 @@ void DatabaseAtomic::renameTable(ContextPtr local_context, const String & table_
StorageID other_table_new_id = StorageID::createEmpty();
if (exchange)
{
other_table = other_db.getTableUnlocked(to_table_name, other_db_lock);
other_table = other_db.getTableUnlocked(to_table_name);
if (dictionary && !other_table->isDictionary())
throw Exception(ErrorCodes::INCORRECT_QUERY, "Use RENAME/EXCHANGE TABLE (instead of RENAME/EXCHANGE DICTIONARY) for tables");
other_table_new_id = {database_name, table_name, other_table->getStorageID().uuid};
@ -294,7 +295,7 @@ void DatabaseAtomic::commitCreateTable(const ASTCreateQuery & query, const Stora
auto table_data_path = getTableDataPath(query);
try
{
std::unique_lock lock{mutex};
std::lock_guard lock{mutex};
if (query.getDatabase() != database_name)
throw Exception(ErrorCodes::UNKNOWN_DATABASE, "Database was renamed to `{}`, cannot create table in `{}`",
database_name, query.getDatabase());
@ -312,7 +313,7 @@ void DatabaseAtomic::commitCreateTable(const ASTCreateQuery & query, const Stora
/// It throws if `table_metadata_path` already exists (it's possible if table was detached)
renameNoReplace(table_metadata_tmp_path, table_metadata_path); /// Commit point (a sort of)
attachTableUnlocked(query.getTable(), table, lock); /// Should never throw
attachTableUnlocked(query.getTable(), table); /// Should never throw
table_name_to_path.emplace(query.getTable(), table_data_path);
}
catch (...)
@ -330,8 +331,8 @@ void DatabaseAtomic::commitAlterTable(const StorageID & table_id, const String &
bool check_file_exists = true;
SCOPE_EXIT({ std::error_code code; if (check_file_exists) std::filesystem::remove(table_metadata_tmp_path, code); });
std::unique_lock lock{mutex};
auto actual_table_id = getTableUnlocked(table_id.table_name, lock)->getStorageID();
std::lock_guard lock{mutex};
auto actual_table_id = getTableUnlocked(table_id.table_name)->getStorageID();
if (table_id.uuid != actual_table_id.uuid)
throw Exception("Cannot alter table because it was renamed", ErrorCodes::CANNOT_ASSIGN_ALTER);
@ -363,7 +364,7 @@ void DatabaseAtomic::assertDetachedTableNotInUse(const UUID & uuid)
void DatabaseAtomic::setDetachedTableNotInUseForce(const UUID & uuid)
{
std::unique_lock lock{mutex};
std::lock_guard lock{mutex};
detached_tables.erase(uuid);
}

View File

@ -35,7 +35,7 @@ public:
bool exchange,
bool dictionary) override;
void dropTable(ContextPtr context, const String & table_name, bool no_delay) override;
void dropTable(ContextPtr context, const String & table_name, bool sync) override;
void attachTable(ContextPtr context, const String & name, const StoragePtr & table, const String & relative_table_path) override;
StoragePtr detachTable(ContextPtr context, const String & name) override;
@ -70,9 +70,9 @@ protected:
void commitCreateTable(const ASTCreateQuery & query, const StoragePtr & table,
const String & table_metadata_tmp_path, const String & table_metadata_path, ContextPtr query_context) override;
void assertDetachedTableNotInUse(const UUID & uuid);
void assertDetachedTableNotInUse(const UUID & uuid) TSA_REQUIRES(mutex);
using DetachedTables = std::unordered_map<UUID, StoragePtr>;
[[nodiscard]] DetachedTables cleanupDetachedTables();
[[nodiscard]] DetachedTables cleanupDetachedTables() TSA_REQUIRES(mutex);
void tryCreateMetadataSymlink();
@ -80,9 +80,9 @@ protected:
//TODO store path in DatabaseWithOwnTables::tables
using NameToPathMap = std::unordered_map<String, String>;
NameToPathMap table_name_to_path;
NameToPathMap table_name_to_path TSA_GUARDED_BY(mutex);
DetachedTables detached_tables;
DetachedTables detached_tables TSA_GUARDED_BY(mutex);
String path_to_table_symlinks;
String path_to_metadata_symlink;
const UUID db_uuid;

View File

@ -62,36 +62,19 @@ namespace ErrorCodes
DatabasePtr DatabaseFactory::get(const ASTCreateQuery & create, const String & metadata_path, ContextPtr context)
{
bool created = false;
/// Creates store/xxx/ for Atomic
fs::create_directories(fs::path(metadata_path).parent_path());
try
{
/// Creates store/xxx/ for Atomic
fs::create_directories(fs::path(metadata_path).parent_path());
DatabasePtr impl = getImpl(create, metadata_path, context);
/// Before 20.7 it's possible that .sql metadata file does not exist for some old database.
/// In this case Ordinary database is created on server startup if the corresponding metadata directory exists.
/// So we should remove metadata directory if database creation failed.
/// TODO remove this code
created = fs::create_directory(metadata_path);
if (impl && context->hasQueryContext() && context->getSettingsRef().log_queries)
context->getQueryContext()->addQueryFactoriesInfo(Context::QueryLogFactories::Database, impl->getEngineName());
DatabasePtr impl = getImpl(create, metadata_path, context);
/// Attach database metadata
if (impl && create.comment)
impl->setDatabaseComment(create.comment->as<ASTLiteral>()->value.safeGet<String>());
if (impl && context->hasQueryContext() && context->getSettingsRef().log_queries)
context->getQueryContext()->addQueryFactoriesInfo(Context::QueryLogFactories::Database, impl->getEngineName());
// Attach database metadata
if (impl && create.comment)
impl->setDatabaseComment(create.comment->as<ASTLiteral>()->value.safeGet<String>());
return impl;
}
catch (...)
{
if (created && fs::exists(metadata_path))
fs::remove_all(metadata_path);
throw;
}
return impl;
}
template <typename ValueType>
@ -139,8 +122,14 @@ DatabasePtr DatabaseFactory::getImpl(const ASTCreateQuery & create, const String
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Database engine `{}` cannot have table overrides", engine_name);
if (engine_name == "Ordinary")
{
if (!create.attach && !context->getSettingsRef().allow_deprecated_database_ordinary)
throw Exception(ErrorCodes::UNKNOWN_DATABASE_ENGINE,
"Ordinary database engine is deprecated (see also allow_deprecated_database_ordinary setting)");
return std::make_shared<DatabaseOrdinary>(database_name, metadata_path, context);
else if (engine_name == "Atomic")
}
if (engine_name == "Atomic")
return std::make_shared<DatabaseAtomic>(database_name, metadata_path, uuid, context);
else if (engine_name == "Memory")
return std::make_shared<DatabaseMemory>(database_name, context);

View File

@ -77,10 +77,10 @@ void DatabaseLazy::createTable(
void DatabaseLazy::dropTable(
ContextPtr local_context,
const String & table_name,
bool no_delay)
bool sync)
{
SCOPE_EXIT_MEMORY_SAFE({ clearExpiredTables(); });
DatabaseOnDisk::dropTable(local_context, table_name, no_delay);
DatabaseOnDisk::dropTable(local_context, table_name, sync);
}
void DatabaseLazy::renameTable(
@ -158,6 +158,7 @@ DatabaseTablesIteratorPtr DatabaseLazy::getTablesIterator(ContextPtr, const Filt
bool DatabaseLazy::empty() const
{
std::lock_guard lock(mutex);
return tables_cache.empty();
}

View File

@ -37,7 +37,7 @@ public:
void dropTable(
ContextPtr context,
const String & table_name,
bool no_delay) override;
bool sync) override;
void renameTable(
ContextPtr context,
@ -102,8 +102,8 @@ private:
const time_t expiration_time;
/// TODO use DatabaseWithOwnTablesBase::tables
mutable TablesCache tables_cache;
mutable CacheExpirationQueue cache_expiration_queue;
mutable TablesCache tables_cache TSA_GUARDED_BY(mutex);
mutable CacheExpirationQueue cache_expiration_queue TSA_GUARDED_BY(mutex);
StoragePtr loadTable(const String & table_name) const;

View File

@ -32,8 +32,8 @@ void DatabaseMemory::createTable(
const StoragePtr & table,
const ASTPtr & query)
{
std::unique_lock lock{mutex};
attachTableUnlocked(table_name, table, lock);
std::lock_guard lock{mutex};
attachTableUnlocked(table_name, table);
/// Clean the query from temporary flags.
ASTPtr query_to_store = query;
@ -52,23 +52,24 @@ void DatabaseMemory::createTable(
void DatabaseMemory::dropTable(
ContextPtr /*context*/,
const String & table_name,
bool /*no_delay*/)
bool /*sync*/)
{
std::unique_lock lock{mutex};
auto table = detachTableUnlocked(table_name, lock);
StoragePtr table;
{
std::lock_guard lock{mutex};
table = detachTableUnlocked(table_name);
}
try
{
/// Remove table without lock since:
/// - it does not require it
/// - it may cause lock-order-inversion if underlying storage need to
/// resolve tables (like StorageLiveView)
SCOPE_EXIT(lock.lock());
lock.unlock();
table->drop();
if (table->storesDataOnDisk())
{
assert(database_name != DatabaseCatalog::TEMPORARY_DATABASE);
assert(getDatabaseName() != DatabaseCatalog::TEMPORARY_DATABASE);
fs::path table_data_dir{getTableDataPath(table_name)};
if (fs::exists(table_data_dir))
fs::remove_all(table_data_dir);
@ -76,10 +77,13 @@ void DatabaseMemory::dropTable(
}
catch (...)
{
std::lock_guard lock{mutex};
assert(database_name != DatabaseCatalog::TEMPORARY_DATABASE);
attachTableUnlocked(table_name, table, lock);
attachTableUnlocked(table_name, table);
throw;
}
std::lock_guard lock{mutex};
table->is_dropped = true;
create_queries.erase(table_name);
UUID table_uuid = table->getStorageID().uuid;

View File

@ -32,7 +32,7 @@ public:
void dropTable(
ContextPtr context,
const String & table_name,
bool no_delay) override;
bool sync) override;
ASTPtr getCreateTableQueryImpl(const String & name, ContextPtr context, bool throw_on_error) const override;
ASTPtr getCreateDatabaseQuery() const override;
@ -51,9 +51,9 @@ public:
void alterTable(ContextPtr local_context, const StorageID & table_id, const StorageInMemoryMetadata & metadata) override;
private:
String data_path;
const String data_path;
using NameToASTCreate = std::unordered_map<String, ASTPtr>;
NameToASTCreate create_queries;
NameToASTCreate create_queries TSA_GUARDED_BY(mutex);
};
}

View File

@ -281,7 +281,7 @@ void DatabaseOnDisk::detachTablePermanently(ContextPtr query_context, const Stri
}
}
void DatabaseOnDisk::dropTable(ContextPtr local_context, const String & table_name, bool /*no_delay*/)
void DatabaseOnDisk::dropTable(ContextPtr local_context, const String & table_name, bool /*sync*/)
{
String table_metadata_path = getObjectMetadataPath(table_name);
String table_metadata_path_drop = table_metadata_path + drop_suffix;
@ -321,11 +321,11 @@ void DatabaseOnDisk::dropTable(ContextPtr local_context, const String & table_na
void DatabaseOnDisk::checkMetadataFilenameAvailability(const String & to_table_name) const
{
std::unique_lock lock(mutex);
checkMetadataFilenameAvailabilityUnlocked(to_table_name, lock);
std::lock_guard lock(mutex);
checkMetadataFilenameAvailabilityUnlocked(to_table_name);
}
void DatabaseOnDisk::checkMetadataFilenameAvailabilityUnlocked(const String & to_table_name, std::unique_lock<std::mutex> &) const
void DatabaseOnDisk::checkMetadataFilenameAvailabilityUnlocked(const String & to_table_name) const
{
String table_metadata_path = getObjectMetadataPath(to_table_name);
@ -503,7 +503,7 @@ ASTPtr DatabaseOnDisk::getCreateDatabaseQuery() const
void DatabaseOnDisk::drop(ContextPtr local_context)
{
assert(tables.empty());
assert(TSA_SUPPRESS_WARNING_FOR_READ(tables).empty());
if (local_context->getSettingsRef().force_remove_data_recursively_on_drop)
{
fs::remove_all(local_context->getPath() + getDataPath());
@ -725,8 +725,6 @@ ASTPtr DatabaseOnDisk::getCreateQueryFromStorage(const String & table_name, cons
void DatabaseOnDisk::modifySettingsMetadata(const SettingsChanges & settings_changes, ContextPtr query_context)
{
std::lock_guard lock(modify_settings_mutex);
auto create_query = getCreateDatabaseQuery()->clone();
auto * create = create_query->as<ASTCreateQuery>();
auto * settings = create->storage->settings;
@ -759,7 +757,7 @@ void DatabaseOnDisk::modifySettingsMetadata(const SettingsChanges & settings_cha
writeChar('\n', statement_buf);
String statement = statement_buf.str();
String database_name_escaped = escapeForFileName(database_name);
String database_name_escaped = escapeForFileName(TSA_SUPPRESS_WARNING_FOR_READ(database_name)); /// FIXME
fs::path metadata_root_path = fs::canonical(query_context->getGlobalContext()->getPath());
fs::path metadata_file_tmp_path = fs::path(metadata_root_path) / "metadata" / (database_name_escaped + ".sql.tmp");
fs::path metadata_file_path = fs::path(metadata_root_path) / "metadata" / (database_name_escaped + ".sql");

View File

@ -43,7 +43,7 @@ public:
void dropTable(
ContextPtr context,
const String & table_name,
bool no_delay) override;
bool sync) override;
void renameTable(
ContextPtr context,
@ -70,7 +70,7 @@ public:
/// will throw when the table we want to attach already exists (in active / detached / detached permanently form)
void checkMetadataFilenameAvailability(const String & to_table_name) const override;
void checkMetadataFilenameAvailabilityUnlocked(const String & to_table_name, std::unique_lock<std::mutex> &) const;
void checkMetadataFilenameAvailabilityUnlocked(const String & to_table_name) const TSA_REQUIRES(mutex);
void modifySettingsMetadata(const SettingsChanges & settings_changes, ContextPtr query_context);
@ -99,9 +99,6 @@ protected:
const String metadata_path;
const String data_path;
/// For alter settings.
std::mutex modify_settings_mutex;
};
}

View File

@ -174,7 +174,8 @@ void DatabaseOrdinary::loadTablesMetadata(ContextPtr local_context, ParsedTables
if (ast)
{
auto * create_query = ast->as<ASTCreateQuery>();
create_query->setDatabase(database_name);
/// NOTE No concurrent writes are possible during database loading
create_query->setDatabase(TSA_SUPPRESS_WARNING_FOR_READ(database_name));
/// Even if we don't load the table we can still mark the uuid of it as taken.
if (create_query->uuid != UUIDHelpers::Nil)
@ -201,7 +202,7 @@ void DatabaseOrdinary::loadTablesMetadata(ContextPtr local_context, ParsedTables
return;
}
QualifiedTableName qualified_name{database_name, create_query->getTable()};
QualifiedTableName qualified_name{TSA_SUPPRESS_WARNING_FOR_READ(database_name), create_query->getTable()};
TableNamesSet loading_dependencies = getDependenciesSetFromCreateQuery(getContext(), qualified_name, ast);
std::lock_guard lock{metadata.mutex};
@ -234,12 +235,12 @@ void DatabaseOrdinary::loadTablesMetadata(ContextPtr local_context, ParsedTables
size_t tables_in_database = objects_in_database - dictionaries_in_database;
LOG_INFO(log, "Metadata processed, database {} has {} tables and {} dictionaries in total.",
database_name, tables_in_database, dictionaries_in_database);
TSA_SUPPRESS_WARNING_FOR_READ(database_name), tables_in_database, dictionaries_in_database);
}
void DatabaseOrdinary::loadTableFromMetadata(ContextMutablePtr local_context, const String & file_path, const QualifiedTableName & name, const ASTPtr & ast, bool force_restore)
{
assert(name.database == database_name);
assert(name.database == TSA_SUPPRESS_WARNING_FOR_READ(database_name));
const auto & create_query = ast->as<const ASTCreateQuery &>();
tryAttachTable(
@ -255,7 +256,8 @@ void DatabaseOrdinary::startupTables(ThreadPool & thread_pool, bool /*force_rest
{
LOG_INFO(log, "Starting up tables.");
const size_t total_tables = tables.size();
/// NOTE No concurrent writes are possible during database loading
const size_t total_tables = TSA_SUPPRESS_WARNING_FOR_READ(tables).size();
if (!total_tables)
return;
@ -271,7 +273,7 @@ void DatabaseOrdinary::startupTables(ThreadPool & thread_pool, bool /*force_rest
try
{
for (const auto & table : tables)
for (const auto & table : TSA_SUPPRESS_WARNING_FOR_READ(tables))
thread_pool.scheduleOrThrowOnError([&]() { startup_one_table(table.second); });
}
catch (...)

View File

@ -148,7 +148,7 @@ ClusterPtr DatabaseReplicated::getClusterImpl() const
if (hosts.empty())
throw Exception(ErrorCodes::NO_ACTIVE_REPLICAS, "No replicas of database {} found. "
"It's possible if the first replica is not fully created yet "
"or if the last replica was just dropped or due to logical error", database_name);
"or if the last replica was just dropped or due to logical error", zookeeper_path);
Int32 cversion = stat.cversion;
::sort(hosts.begin(), hosts.end());
@ -213,7 +213,7 @@ ClusterPtr DatabaseReplicated::getClusterImpl() const
treat_local_port_as_remote,
cluster_auth_info.cluster_secure_connection,
/*priority=*/1,
database_name,
TSA_SUPPRESS_WARNING_FOR_READ(database_name), /// FIXME
cluster_auth_info.cluster_secret);
}
@ -588,7 +588,7 @@ void DatabaseReplicated::recoverLostReplica(const ZooKeeperPtr & current_zookeep
query_context->makeQueryContext();
query_context->getClientInfo().query_kind = ClientInfo::QueryKind::SECONDARY_QUERY;
query_context->getClientInfo().is_replicated_database_internal = true;
query_context->setCurrentDatabase(database_name);
query_context->setCurrentDatabase(getDatabaseName());
query_context->setCurrentQueryId("");
auto txn = std::make_shared<ZooKeeperMetadataTransaction>(current_zookeeper, zookeeper_path, false, "");
query_context->initZooKeeperMetadataTransaction(txn);
@ -608,6 +608,7 @@ void DatabaseReplicated::recoverLostReplica(const ZooKeeperPtr & current_zookeep
/// and make possible creation of new table with the same UUID.
String query = fmt::format("CREATE DATABASE IF NOT EXISTS {} ENGINE=Ordinary", backQuoteIfNeed(to_db_name));
auto query_context = Context::createCopy(getContext());
query_context->setSetting("allow_deprecated_database_ordinary", 1);
executeQuery(query, query_context, true);
/// But we want to avoid discarding UUID of ReplicatedMergeTree tables, because it will not work
@ -811,7 +812,7 @@ void DatabaseReplicated::shutdown()
}
void DatabaseReplicated::dropTable(ContextPtr local_context, const String & table_name, bool no_delay)
void DatabaseReplicated::dropTable(ContextPtr local_context, const String & table_name, bool sync)
{
auto txn = local_context->getZooKeeperMetadataTransaction();
assert(!ddl_worker->isCurrentlyActive() || txn || startsWith(table_name, ".inner_id."));
@ -820,7 +821,7 @@ void DatabaseReplicated::dropTable(ContextPtr local_context, const String & tabl
String metadata_zk_path = zookeeper_path + "/metadata/" + escapeForFileName(table_name);
txn->addOp(zkutil::makeRemoveRequest(metadata_zk_path, -1));
}
DatabaseAtomic::dropTable(local_context, table_name, no_delay);
DatabaseAtomic::dropTable(local_context, table_name, sync);
}
void DatabaseReplicated::renameTable(ContextPtr local_context, const String & table_name, IDatabase & to_database,

View File

@ -30,7 +30,7 @@ public:
String getEngineName() const override { return "Replicated"; }
/// If current query is initial, then the following methods add metadata updating ZooKeeper operations to current ZooKeeperMetadataTransaction.
void dropTable(ContextPtr, const String & table_name, bool no_delay) override;
void dropTable(ContextPtr, const String & table_name, bool sync) override;
void renameTable(ContextPtr context, const String & table_name, IDatabase & to_database,
const String & to_table_name, bool exchange, bool dictionary) override;
void commitCreateTable(const ASTCreateQuery & query, const StoragePtr & table,

View File

@ -218,11 +218,11 @@ bool DatabaseWithOwnTablesBase::empty() const
StoragePtr DatabaseWithOwnTablesBase::detachTable(ContextPtr /* context_ */, const String & table_name)
{
std::unique_lock lock(mutex);
return detachTableUnlocked(table_name, lock);
std::lock_guard lock(mutex);
return detachTableUnlocked(table_name);
}
StoragePtr DatabaseWithOwnTablesBase::detachTableUnlocked(const String & table_name, std::unique_lock<std::mutex> &)
StoragePtr DatabaseWithOwnTablesBase::detachTableUnlocked(const String & table_name)
{
StoragePtr res;
@ -245,11 +245,11 @@ StoragePtr DatabaseWithOwnTablesBase::detachTableUnlocked(const String & table_n
void DatabaseWithOwnTablesBase::attachTable(ContextPtr /* context_ */, const String & table_name, const StoragePtr & table, const String &)
{
std::unique_lock lock(mutex);
attachTableUnlocked(table_name, table, lock);
std::lock_guard lock(mutex);
attachTableUnlocked(table_name, table);
}
void DatabaseWithOwnTablesBase::attachTableUnlocked(const String & table_name, const StoragePtr & table, std::unique_lock<std::mutex> &)
void DatabaseWithOwnTablesBase::attachTableUnlocked(const String & table_name, const StoragePtr & table)
{
auto table_id = table->getStorageID();
if (table_id.database_name != database_name)
@ -313,7 +313,7 @@ DatabaseWithOwnTablesBase::~DatabaseWithOwnTablesBase()
}
}
StoragePtr DatabaseWithOwnTablesBase::getTableUnlocked(const String & table_name, std::unique_lock<std::mutex> &) const
StoragePtr DatabaseWithOwnTablesBase::getTableUnlocked(const String & table_name) const
{
auto it = tables.find(table_name);
if (it != tables.end())

View File

@ -45,14 +45,14 @@ public:
~DatabaseWithOwnTablesBase() override;
protected:
Tables tables;
Tables tables TSA_GUARDED_BY(mutex);
Poco::Logger * log;
DatabaseWithOwnTablesBase(const String & name_, const String & logger, ContextPtr context);
void attachTableUnlocked(const String & table_name, const StoragePtr & table, std::unique_lock<std::mutex> & lock);
StoragePtr detachTableUnlocked(const String & table_name, std::unique_lock<std::mutex> & lock);
StoragePtr getTableUnlocked(const String & table_name, std::unique_lock<std::mutex> & lock) const;
void attachTableUnlocked(const String & table_name, const StoragePtr & table) TSA_REQUIRES(mutex);
StoragePtr detachTableUnlocked(const String & table_name) TSA_REQUIRES(mutex);
StoragePtr getTableUnlocked(const String & table_name) const TSA_REQUIRES(mutex);
};
}

View File

@ -19,7 +19,7 @@ StoragePtr IDatabase::getTable(const String & name, ContextPtr context) const
{
if (auto storage = tryGetTable(name, context))
return storage;
throw Exception(ErrorCodes::UNKNOWN_TABLE, "Table {}.{} doesn't exist", backQuoteIfNeed(database_name), backQuoteIfNeed(name));
throw Exception(ErrorCodes::UNKNOWN_TABLE, "Table {}.{} doesn't exist", backQuoteIfNeed(getDatabaseName()), backQuoteIfNeed(name));
}
ASTPtr IDatabase::getCreateDatabaseQueryForBackup() const

View File

@ -198,7 +198,7 @@ public:
virtual void dropTable( /// NOLINT
ContextPtr /*context*/,
const String & /*name*/,
[[maybe_unused]] bool no_delay = false)
[[maybe_unused]] bool sync = false)
{
throw Exception("There is no DROP TABLE query for Database" + getEngineName(), ErrorCodes::NOT_IMPLEMENTED);
}
@ -356,8 +356,8 @@ protected:
}
mutable std::mutex mutex;
String database_name;
String comment;
String database_name TSA_GUARDED_BY(mutex);
String comment TSA_GUARDED_BY(mutex);
};
using DatabasePtr = std::shared_ptr<IDatabase>;

View File

@ -80,10 +80,10 @@ void DatabaseMaterializedMySQL::createTable(ContextPtr context_, const String &
DatabaseAtomic::createTable(context_, name, table, query);
}
void DatabaseMaterializedMySQL::dropTable(ContextPtr context_, const String & name, bool no_delay)
void DatabaseMaterializedMySQL::dropTable(ContextPtr context_, const String & name, bool sync)
{
checkIsInternalQuery(context_, "DROP TABLE");
DatabaseAtomic::dropTable(context_, name, no_delay);
DatabaseAtomic::dropTable(context_, name, sync);
}
void DatabaseMaterializedMySQL::attachTable(ContextPtr context_, const String & name, const StoragePtr & table, const String & relative_table_path)

View File

@ -52,7 +52,7 @@ public:
void createTable(ContextPtr context_, const String & name, const StoragePtr & table, const ASTPtr & query) override;
void dropTable(ContextPtr context_, const String & name, bool no_delay) override;
void dropTable(ContextPtr context_, const String & name, bool sync) override;
void attachTable(ContextPtr context_, const String & name, const StoragePtr & table, const String & relative_table_path) override;

View File

@ -447,7 +447,7 @@ void DatabaseMySQL::detachTablePermanently(ContextPtr, const String & table_name
table_iter->second.second->is_dropped = true;
}
void DatabaseMySQL::dropTable(ContextPtr local_context, const String & table_name, bool /*no_delay*/)
void DatabaseMySQL::dropTable(ContextPtr local_context, const String & table_name, bool /*sync*/)
{
detachTablePermanently(local_context, table_name);
}

View File

@ -82,7 +82,7 @@ public:
void detachTablePermanently(ContextPtr context, const String & table_name) override;
void dropTable(ContextPtr context, const String & table_name, bool no_delay) override;
void dropTable(ContextPtr context, const String & table_name, bool sync) override;
void attachTable(ContextPtr context, const String & table_name, const StoragePtr & storage, const String & relative_table_path) override;
@ -109,15 +109,15 @@ private:
void cleanOutdatedTables();
void fetchTablesIntoLocalCache(ContextPtr context) const;
void fetchTablesIntoLocalCache(ContextPtr context) const TSA_REQUIRES(mutex);
std::map<String, UInt64> fetchTablesWithModificationTime(ContextPtr local_context) const;
std::map<String, ColumnsDescription> fetchTablesColumnsList(const std::vector<String> & tables_name, ContextPtr context) const;
void destroyLocalCacheExtraTables(const std::map<String, UInt64> & tables_with_modification_time) const;
void destroyLocalCacheExtraTables(const std::map<String, UInt64> & tables_with_modification_time) const TSA_REQUIRES(mutex);
void fetchLatestTablesStructureIntoCache(const std::map<String, UInt64> & tables_modification_time, ContextPtr context) const;
void fetchLatestTablesStructureIntoCache(const std::map<String, UInt64> & tables_modification_time, ContextPtr context) const TSA_REQUIRES(mutex);
ThreadFromGlobalPool thread;
};

View File

@ -63,9 +63,9 @@ void DatabaseMaterializedPostgreSQL::startSynchronization()
return;
replication_handler = std::make_unique<PostgreSQLReplicationHandler>(
/* replication_identifier */database_name,
/* replication_identifier */ TSA_SUPPRESS_WARNING_FOR_READ(database_name), /// FIXME
remote_database_name,
database_name,
TSA_SUPPRESS_WARNING_FOR_READ(database_name), /// FIXME
connection_info,
getContext(),
is_attach,
@ -99,7 +99,8 @@ void DatabaseMaterializedPostgreSQL::startSynchronization()
else
{
/// Nested table does not exist and will be created by replication thread.
storage = std::make_shared<StorageMaterializedPostgreSQL>(StorageID(database_name, table_name), getContext(), remote_database_name, table_name);
/// FIXME TSA
storage = std::make_shared<StorageMaterializedPostgreSQL>(StorageID(TSA_SUPPRESS_WARNING_FOR_READ(database_name), table_name), getContext(), remote_database_name, table_name);
}
/// Cache MaterializedPostgreSQL wrapper over nested table.
@ -210,7 +211,8 @@ ASTPtr DatabaseMaterializedPostgreSQL::getCreateTableQueryImpl(const String & ta
std::lock_guard lock(handler_mutex);
auto storage = std::make_shared<StorageMaterializedPostgreSQL>(StorageID(database_name, table_name), getContext(), remote_database_name, table_name);
/// FIXME TSA
auto storage = std::make_shared<StorageMaterializedPostgreSQL>(StorageID(TSA_SUPPRESS_WARNING_FOR_READ(database_name), table_name), getContext(), remote_database_name, table_name);
auto ast_storage = replication_handler->getCreateNestedTableQuery(storage.get(), table_name);
assert_cast<ASTCreateQuery *>(ast_storage.get())->uuid = UUIDHelpers::generateV4();
return ast_storage;
@ -234,7 +236,7 @@ ASTPtr DatabaseMaterializedPostgreSQL::createAlterSettingsQuery(const SettingCha
auto * alter = query->as<ASTAlterQuery>();
alter->alter_object = ASTAlterQuery::AlterObjectType::DATABASE;
alter->setDatabase(database_name);
alter->setDatabase(TSA_SUPPRESS_WARNING_FOR_READ(database_name)); /// FIXME
alter->set(alter->command_list, command_list);
return query;
@ -390,10 +392,10 @@ void DatabaseMaterializedPostgreSQL::stopReplication()
}
void DatabaseMaterializedPostgreSQL::dropTable(ContextPtr local_context, const String & table_name, bool no_delay)
void DatabaseMaterializedPostgreSQL::dropTable(ContextPtr local_context, const String & table_name, bool sync)
{
/// Modify context into nested_context and pass query to Atomic database.
DatabaseAtomic::dropTable(StorageMaterializedPostgreSQL::makeNestedTableContext(local_context), table_name, no_delay);
DatabaseAtomic::dropTable(StorageMaterializedPostgreSQL::makeNestedTableContext(local_context), table_name, sync);
}

View File

@ -55,7 +55,7 @@ public:
StoragePtr detachTable(ContextPtr context, const String & table_name) override;
void dropTable(ContextPtr local_context, const String & name, bool no_delay) override;
void dropTable(ContextPtr local_context, const String & name, bool sync) override;
void drop(ContextPtr local_context) override;

View File

@ -264,7 +264,7 @@ void DatabasePostgreSQL::createTable(ContextPtr local_context, const String & ta
}
void DatabasePostgreSQL::dropTable(ContextPtr, const String & table_name, bool /* no_delay */)
void DatabasePostgreSQL::dropTable(ContextPtr, const String & table_name, bool /* sync */)
{
std::lock_guard<std::mutex> lock{mutex};
@ -369,7 +369,11 @@ ASTPtr DatabasePostgreSQL::getCreateDatabaseQuery() const
ASTPtr DatabasePostgreSQL::getCreateTableQueryImpl(const String & table_name, ContextPtr local_context, bool throw_on_error) const
{
auto storage = fetchTable(table_name, local_context, false);
StoragePtr storage;
{
std::lock_guard lock{mutex};
storage = fetchTable(table_name, local_context, false);
}
if (!storage)
{
if (throw_on_error)

View File

@ -53,7 +53,7 @@ public:
StoragePtr tryGetTable(const String & name, ContextPtr context) const override;
void createTable(ContextPtr, const String & table_name, const StoragePtr & storage, const ASTPtr & create_query) override;
void dropTable(ContextPtr, const String & table_name, bool no_delay) override;
void dropTable(ContextPtr, const String & table_name, bool sync) override;
void attachTable(ContextPtr context, const String & table_name, const StoragePtr & storage, const String & relative_table_path) override;
StoragePtr detachTable(ContextPtr context, const String & table_name) override;
@ -81,7 +81,7 @@ private:
bool checkPostgresTable(const String & table_name) const;
StoragePtr fetchTable(const String & table_name, ContextPtr context, bool table_checked) const;
StoragePtr fetchTable(const String & table_name, ContextPtr context, bool table_checked) const TSA_REQUIRES(mutex);
void removeOutdatedTables();

View File

@ -173,12 +173,16 @@ ASTPtr DatabaseSQLite::getCreateDatabaseQuery() const
ASTPtr DatabaseSQLite::getCreateTableQueryImpl(const String & table_name, ContextPtr local_context, bool throw_on_error) const
{
auto storage = fetchTable(table_name, local_context, false);
StoragePtr storage;
{
std::lock_guard<std::mutex> lock(mutex);
storage = fetchTable(table_name, local_context, false);
}
if (!storage)
{
if (throw_on_error)
throw Exception(ErrorCodes::UNKNOWN_TABLE, "SQLite table {}.{} does not exist",
database_name, table_name);
getDatabaseName(), table_name);
return nullptr;
}
auto table_storage_define = database_engine_define->clone();

View File

@ -54,9 +54,9 @@ private:
bool checkSQLiteTable(const String & table_name) const;
NameSet fetchTablesList() const;
NameSet fetchTablesList() const TSA_REQUIRES(mutex);
StoragePtr fetchTable(const String & table_name, ContextPtr context, bool table_checked) const;
StoragePtr fetchTable(const String & table_name, ContextPtr context, bool table_checked) const TSA_REQUIRES(mutex);
};

View File

@ -1,7 +1,6 @@
#pragma once
#include <Parsers/IAST_fwd.h>
#include <Common/RWLock.h>
#include <Common/ThreadPool.h>
#include <Core/Settings.h>
#include <Poco/Logger.h>

View File

@ -124,7 +124,7 @@ protected:
std::string queue_dir; /// dir with queue of queries
mutable std::mutex zookeeper_mutex;
ZooKeeperPtr current_zookeeper;
ZooKeeperPtr current_zookeeper TSA_GUARDED_BY(zookeeper_mutex);
/// Save state of executed task to avoid duplicate execution on ZK error
std::optional<String> last_skipped_entry_name;

View File

@ -205,7 +205,10 @@ void DatabaseCatalog::shutdownImpl()
for (auto & database : current_databases)
database.second->shutdown();
tables_marked_dropped.clear();
{
std::lock_guard lock(tables_marked_dropped_mutex);
tables_marked_dropped.clear();
}
std::lock_guard lock(databases_mutex);
for (const auto & db : databases)
@ -223,6 +226,7 @@ void DatabaseCatalog::shutdownImpl()
auto & table = mapping.second.second;
return db || table;
};
std::lock_guard map_lock{elem.mutex};
auto it = std::find_if(elem.map.begin(), elem.map.end(), not_empty_mapping);
return it != elem.map.end();
}) == uuid_map.end());
@ -689,7 +693,8 @@ DatabaseCatalog::updateDependency(const StorageID & old_from, const StorageID &
DDLGuardPtr DatabaseCatalog::getDDLGuard(const String & database, const String & table)
{
std::unique_lock lock(ddl_guards_mutex);
auto db_guard_iter = ddl_guards.try_emplace(database).first;
/// TSA does not support unique_lock
auto db_guard_iter = TSA_SUPPRESS_WARNING_FOR_WRITE(ddl_guards).try_emplace(database).first;
DatabaseGuard & db_guard = db_guard_iter->second;
return std::make_unique<DDLGuard>(db_guard.first, db_guard.second, std::move(lock), table, database);
}
@ -999,7 +1004,7 @@ void DatabaseCatalog::waitTableFinallyDropped(const UUID & uuid)
LOG_DEBUG(log, "Waiting for table {} to be finally dropped", toString(uuid));
std::unique_lock lock{tables_marked_dropped_mutex};
wait_table_finally_dropped.wait(lock, [&]()
wait_table_finally_dropped.wait(lock, [&]() TSA_REQUIRES(tables_marked_dropped_mutex) -> bool
{
return !tables_marked_dropped_ids.contains(uuid);
});

View File

@ -221,7 +221,7 @@ public:
DependenciesInfo getLoadingDependenciesInfo(const StorageID & table_id) const;
TableNamesSet tryRemoveLoadingDependencies(const StorageID & table_id, bool check_dependencies, bool is_drop_database = false);
TableNamesSet tryRemoveLoadingDependenciesUnlocked(const QualifiedTableName & removing_table, bool check_dependencies, bool is_drop_database = false);
TableNamesSet tryRemoveLoadingDependenciesUnlocked(const QualifiedTableName & removing_table, bool check_dependencies, bool is_drop_database = false) TSA_REQUIRES(databases_mutex);
void checkTableCanBeRemovedOrRenamed(const StorageID & table_id) const;
void updateLoadingDependencies(const StorageID & table_id, TableNamesSet && new_dependencies);
@ -233,15 +233,15 @@ private:
static std::unique_ptr<DatabaseCatalog> database_catalog;
explicit DatabaseCatalog(ContextMutablePtr global_context_);
void assertDatabaseExistsUnlocked(const String & database_name) const;
void assertDatabaseDoesntExistUnlocked(const String & database_name) const;
void assertDatabaseExistsUnlocked(const String & database_name) const TSA_REQUIRES(databases_mutex);
void assertDatabaseDoesntExistUnlocked(const String & database_name) const TSA_REQUIRES(databases_mutex);
void shutdownImpl();
struct UUIDToStorageMapPart
{
std::unordered_map<UUID, DatabaseAndTable> map;
std::unordered_map<UUID, DatabaseAndTable> map TSA_GUARDED_BY(mutex);
mutable std::mutex mutex;
};
@ -273,12 +273,12 @@ private:
mutable std::mutex databases_mutex;
ViewDependencies view_dependencies;
ViewDependencies view_dependencies TSA_GUARDED_BY(databases_mutex);
Databases databases;
Databases databases TSA_GUARDED_BY(databases_mutex);
UUIDToStorageMap uuid_map;
DependenciesInfos loading_dependencies;
DependenciesInfos loading_dependencies TSA_GUARDED_BY(databases_mutex);
Poco::Logger * log;
@ -290,12 +290,12 @@ private:
/// In case the element already exists, waits when query will be executed in other thread. See class DDLGuard below.
using DatabaseGuard = std::pair<DDLGuard::Map, std::shared_mutex>;
using DDLGuards = std::map<String, DatabaseGuard>;
DDLGuards ddl_guards;
DDLGuards ddl_guards TSA_GUARDED_BY(ddl_guards_mutex);
/// If you capture mutex and ddl_guards_mutex, then you need to grab them strictly in this order.
mutable std::mutex ddl_guards_mutex;
TablesMarkedAsDropped tables_marked_dropped;
std::unordered_set<UUID> tables_marked_dropped_ids;
TablesMarkedAsDropped tables_marked_dropped TSA_GUARDED_BY(tables_marked_dropped_mutex);
std::unordered_set<UUID> tables_marked_dropped_ids TSA_GUARDED_BY(tables_marked_dropped_mutex);
mutable std::mutex tables_marked_dropped_mutex;
std::unique_ptr<BackgroundSchedulePoolTaskHolder> drop_task;

View File

@ -16,7 +16,7 @@
#include <Common/ColumnsHashing.h>
#include <Common/HashTable/HashMap.h>
#include <Common/HashTable/FixedHashMap.h>
#include <Common/RWLock.h>
#include <Storages/TableLockHolder.h>
#include <Columns/ColumnString.h>
#include <Columns/ColumnFixedString.h>
@ -339,7 +339,7 @@ public:
/// We keep correspondence between used_flags and hash table internal buffer.
/// Hash table cannot be modified during HashJoin lifetime and must be protected with lock.
void setLock(RWLockImpl::LockHolder rwlock_holder)
void setLock(TableLockHolder rwlock_holder)
{
storage_join_lock = rwlock_holder;
}
@ -394,7 +394,7 @@ private:
/// Should be set via setLock to protect hash table from modification from StorageJoin
/// If set HashJoin instance is not available for modification (addJoinedBlock)
RWLockImpl::LockHolder storage_join_lock = nullptr;
TableLockHolder storage_join_lock = nullptr;
void dataMapInit(MapsVariant &);

View File

@ -0,0 +1,77 @@
#include <Interpreters/InterpreterCreateIndexQuery.h>
#include <Access/ContextAccess.h>
#include <Databases/DatabaseReplicated.h>
#include <Interpreters/Context.h>
#include <Interpreters/executeDDLQueryOnCluster.h>
#include <Parsers/ASTCreateIndexQuery.h>
#include <Parsers/ASTIdentifier.h>
#include <Parsers/ASTIndexDeclaration.h>
#include <Storages/AlterCommands.h>
namespace DB
{
namespace ErrorCodes
{
extern const int TABLE_IS_READ_ONLY;
}
BlockIO InterpreterCreateIndexQuery::execute()
{
const auto & create_index = query_ptr->as<ASTCreateIndexQuery &>();
AccessRightsElements required_access;
required_access.emplace_back(AccessType::ALTER_ADD_INDEX, create_index.getDatabase(), create_index.getTable());
if (!create_index.cluster.empty())
{
DDLQueryOnClusterParams params;
params.access_to_check = std::move(required_access);
return executeDDLQueryOnCluster(query_ptr, getContext(), params);
}
getContext()->checkAccess(required_access);
auto table_id = getContext()->resolveStorageID(create_index, Context::ResolveOrdinary);
query_ptr->as<ASTCreateIndexQuery &>().setDatabase(table_id.database_name);
DatabasePtr database = DatabaseCatalog::instance().getDatabase(table_id.database_name);
if (typeid_cast<DatabaseReplicated *>(database.get())
&& !getContext()->getClientInfo().is_replicated_database_internal)
{
auto guard = DatabaseCatalog::instance().getDDLGuard(table_id.database_name, table_id.table_name);
guard->releaseTableLock();
return typeid_cast<DatabaseReplicated *>(database.get())->tryEnqueueReplicatedDDL(query_ptr, getContext());
}
StoragePtr table = DatabaseCatalog::instance().getTable(table_id, getContext());
if (table->isStaticStorage())
throw Exception(ErrorCodes::TABLE_IS_READ_ONLY, "Table is read-only");
/// Convert ASTCreateIndexQuery to AlterCommand.
AlterCommands alter_commands;
AlterCommand command;
command.index_decl = create_index.index_decl;
command.type = AlterCommand::ADD_INDEX;
command.index_name = create_index.index_name->as<ASTIdentifier &>().name();
command.if_not_exists = create_index.if_not_exists;
/// Fill name in ASTIndexDeclaration
auto & ast_index_decl = command.index_decl->as<ASTIndexDeclaration &>();
ast_index_decl.name = command.index_name;
alter_commands.emplace_back(std::move(command));
auto alter_lock = table->lockForAlter(getContext()->getSettingsRef().lock_acquire_timeout);
StorageInMemoryMetadata metadata = table->getInMemoryMetadata();
alter_commands.validate(table, getContext());
alter_commands.prepare(metadata);
table->checkAlterIsPossible(alter_commands, getContext());
table->alter(alter_commands, getContext(), alter_lock);
return {};
}
}

View File

@ -0,0 +1,24 @@
#pragma once
#include <Interpreters/IInterpreter.h>
namespace DB
{
class Context;
class InterpreterCreateIndexQuery : public IInterpreter, WithContext
{
public:
InterpreterCreateIndexQuery(const ASTPtr & query_ptr_, ContextPtr context_)
: WithContext(context_)
, query_ptr(query_ptr_) {}
BlockIO execute() override;
private:
ASTPtr query_ptr;
};
}

View File

@ -150,10 +150,9 @@ BlockIO InterpreterCreateQuery::createDatabase(ASTCreateQuery & create)
/// When attaching old-style database during server startup, we must always use Ordinary engine
if (create.attach)
throw Exception("Database engine must be specified for ATTACH DATABASE query", ErrorCodes::UNKNOWN_DATABASE_ENGINE);
bool old_style_database = getContext()->getSettingsRef().default_database_engine.value == DefaultDatabaseEngine::Ordinary;
auto engine = std::make_shared<ASTFunction>();
auto storage = std::make_shared<ASTStorage>();
engine->name = old_style_database ? "Ordinary" : "Atomic";
engine->name = "Atomic";
engine->no_empty_args = true;
storage->set(storage->engine, engine);
create.set(create.storage, storage);
@ -196,8 +195,7 @@ BlockIO InterpreterCreateQuery::createDatabase(ASTCreateQuery & create)
if (create_from_user)
{
const auto & default_engine = getContext()->getSettingsRef().default_database_engine.value;
if (create.uuid == UUIDHelpers::Nil && default_engine == DefaultDatabaseEngine::Atomic)
if (create.uuid == UUIDHelpers::Nil)
create.uuid = UUIDHelpers::generateV4(); /// Will enable Atomic engine for nested database
}
else if (attach_from_user && create.uuid == UUIDHelpers::Nil)

View File

@ -0,0 +1,71 @@
#include <Access/ContextAccess.h>
#include <Databases/DatabaseReplicated.h>
#include <Interpreters/Context.h>
#include <Interpreters/executeDDLQueryOnCluster.h>
#include <Interpreters/InterpreterDropIndexQuery.h>
#include <Parsers/ASTDropIndexQuery.h>
#include <Parsers/ASTIdentifier.h>
#include <Storages/AlterCommands.h>
namespace DB
{
namespace ErrorCodes
{
extern const int TABLE_IS_READ_ONLY;
}
BlockIO InterpreterDropIndexQuery::execute()
{
const auto & drop_index = query_ptr->as<ASTDropIndexQuery &>();
AccessRightsElements required_access;
required_access.emplace_back(AccessType::ALTER_DROP_INDEX, drop_index.getDatabase(), drop_index.getTable());
if (!drop_index.cluster.empty())
{
DDLQueryOnClusterParams params;
params.access_to_check = std::move(required_access);
return executeDDLQueryOnCluster(query_ptr, getContext(), params);
}
getContext()->checkAccess(required_access);
auto table_id = getContext()->resolveStorageID(drop_index, Context::ResolveOrdinary);
query_ptr->as<ASTDropIndexQuery &>().setDatabase(table_id.database_name);
DatabasePtr database = DatabaseCatalog::instance().getDatabase(table_id.database_name);
if (typeid_cast<DatabaseReplicated *>(database.get())
&& !getContext()->getClientInfo().is_replicated_database_internal)
{
auto guard = DatabaseCatalog::instance().getDDLGuard(table_id.database_name, table_id.table_name);
guard->releaseTableLock();
return typeid_cast<DatabaseReplicated *>(database.get())->tryEnqueueReplicatedDDL(query_ptr, getContext());
}
StoragePtr table = DatabaseCatalog::instance().getTable(table_id, getContext());
if (table->isStaticStorage())
throw Exception(ErrorCodes::TABLE_IS_READ_ONLY, "Table is read-only");
/// Convert ASTDropIndexQuery to AlterCommand.
AlterCommands alter_commands;
AlterCommand command;
command.ast = drop_index.convertToASTAlterCommand();
command.type = AlterCommand::DROP_INDEX;
command.index_name = drop_index.index_name->as<ASTIdentifier &>().name();
command.if_exists = drop_index.if_exists;
alter_commands.emplace_back(std::move(command));
auto alter_lock = table->lockForAlter(getContext()->getSettingsRef().lock_acquire_timeout);
StorageInMemoryMetadata metadata = table->getInMemoryMetadata();
alter_commands.validate(table, getContext());
alter_commands.prepare(metadata);
table->checkAlterIsPossible(alter_commands, getContext());
table->alter(alter_commands, getContext(), alter_lock);
return {};
}
}

View File

@ -0,0 +1,24 @@
#pragma once
#include <Interpreters/IInterpreter.h>
namespace DB
{
class Context;
class InterpreterDropIndexQuery : public IInterpreter, WithContext
{
public:
InterpreterDropIndexQuery(const ASTPtr & query_ptr_, ContextPtr context_)
: WithContext(context_)
, query_ptr(query_ptr_) {}
BlockIO execute() override;
private:
ASTPtr query_ptr;
};
}

View File

@ -62,7 +62,7 @@ BlockIO InterpreterDropQuery::execute()
}
if (getContext()->getSettingsRef().database_atomic_wait_for_drop_and_detach_synchronously)
drop.no_delay = true;
drop.sync = true;
if (drop.table)
return executeToTable(drop);
@ -89,7 +89,7 @@ BlockIO InterpreterDropQuery::executeToTable(ASTDropQuery & query)
DatabasePtr database;
UUID table_to_wait_on = UUIDHelpers::Nil;
auto res = executeToTableImpl(getContext(), query, database, table_to_wait_on);
if (query.no_delay)
if (query.sync)
waitForTableToBeActuallyDroppedOrDetached(query, database, table_to_wait_on);
return res;
}
@ -244,7 +244,7 @@ BlockIO InterpreterDropQuery::executeToTableImpl(ContextPtr context_, ASTDropQue
DatabaseCatalog::instance().tryRemoveLoadingDependencies(table_id, getContext()->getSettingsRef().check_table_dependencies,
is_drop_or_detach_database);
database->dropTable(context_, table_id.table_name, query.no_delay);
database->dropTable(context_, table_id.table_name, query.sync);
}
db = database;
@ -300,7 +300,7 @@ BlockIO InterpreterDropQuery::executeToDatabase(const ASTDropQuery & query)
}
catch (...)
{
if (query.no_delay)
if (query.sync)
{
for (const auto & table_uuid : tables_to_wait)
waitForTableToBeActuallyDroppedOrDetached(query, database, table_uuid);
@ -308,7 +308,7 @@ BlockIO InterpreterDropQuery::executeToDatabase(const ASTDropQuery & query)
throw;
}
if (query.no_delay)
if (query.sync)
{
for (const auto & table_uuid : tables_to_wait)
waitForTableToBeActuallyDroppedOrDetached(query, database, table_uuid);
@ -345,7 +345,7 @@ BlockIO InterpreterDropQuery::executeToDatabaseImpl(const ASTDropQuery & query,
query_for_table.kind = query.kind;
query_for_table.if_exists = true;
query_for_table.setDatabase(database_name);
query_for_table.no_delay = query.no_delay;
query_for_table.sync = query.sync;
/// Flush should not be done if shouldBeEmptyOnDetach() == false,
/// since in this case getTablesIterator() may do some additional work,
@ -368,7 +368,7 @@ BlockIO InterpreterDropQuery::executeToDatabaseImpl(const ASTDropQuery & query,
}
}
if (!drop && query.no_delay)
if (!drop && query.sync)
{
/// Avoid "some tables are still in use" when sync mode is enabled
for (const auto & table_uuid : uuids_to_wait)
@ -428,7 +428,7 @@ void InterpreterDropQuery::extendQueryLogElemImpl(QueryLogElement & elem, const
elem.query_kind = "Drop";
}
void InterpreterDropQuery::executeDropQuery(ASTDropQuery::Kind kind, ContextPtr global_context, ContextPtr current_context, const StorageID & target_table_id, bool no_delay)
void InterpreterDropQuery::executeDropQuery(ASTDropQuery::Kind kind, ContextPtr global_context, ContextPtr current_context, const StorageID & target_table_id, bool sync)
{
if (DatabaseCatalog::instance().tryGetTable(target_table_id, current_context))
{
@ -437,7 +437,7 @@ void InterpreterDropQuery::executeDropQuery(ASTDropQuery::Kind kind, ContextPtr
drop_query->setDatabase(target_table_id.database_name);
drop_query->setTable(target_table_id.table_name);
drop_query->kind = kind;
drop_query->no_delay = no_delay;
drop_query->sync = sync;
drop_query->if_exists = true;
ASTPtr ast_drop_query = drop_query;
/// FIXME We have to use global context to execute DROP query for inner table

View File

@ -26,7 +26,7 @@ public:
void extendQueryLogElemImpl(QueryLogElement & elem, const ASTPtr &, ContextPtr) const override;
static void executeDropQuery(ASTDropQuery::Kind kind, ContextPtr global_context, ContextPtr current_context, const StorageID & target_table_id, bool no_delay);
static void executeDropQuery(ASTDropQuery::Kind kind, ContextPtr global_context, ContextPtr current_context, const StorageID & target_table_id, bool sync);
private:
AccessRightsElements getRequiredAccessForDDLOnCluster() const;

View File

@ -3,7 +3,9 @@
#include <Parsers/ASTCheckQuery.h>
#include <Parsers/ASTCreateQuery.h>
#include <Parsers/ASTCreateFunctionQuery.h>
#include <Parsers/ASTCreateIndexQuery.h>
#include <Parsers/ASTDropFunctionQuery.h>
#include <Parsers/ASTDropIndexQuery.h>
#include <Parsers/ASTDropQuery.h>
#include <Parsers/ASTExplainQuery.h>
#include <Parsers/ASTInsertQuery.h>
@ -42,10 +44,12 @@
#include <Interpreters/InterpreterBackupQuery.h>
#include <Interpreters/InterpreterCheckQuery.h>
#include <Interpreters/InterpreterCreateFunctionQuery.h>
#include <Interpreters/InterpreterCreateIndexQuery.h>
#include <Interpreters/InterpreterCreateQuery.h>
#include <Interpreters/InterpreterDescribeQuery.h>
#include <Interpreters/InterpreterDescribeCacheQuery.h>
#include <Interpreters/InterpreterDropFunctionQuery.h>
#include <Interpreters/InterpreterDropIndexQuery.h>
#include <Interpreters/InterpreterDropQuery.h>
#include <Interpreters/InterpreterExistsQuery.h>
#include <Interpreters/InterpreterExplainQuery.h>
@ -298,6 +302,14 @@ std::unique_ptr<IInterpreter> InterpreterFactory::get(ASTPtr & query, ContextMut
{
return std::make_unique<InterpreterDropFunctionQuery>(query, context);
}
else if (query->as<ASTCreateIndexQuery>())
{
return std::make_unique<InterpreterCreateIndexQuery>(query, context);
}
else if (query->as<ASTDropIndexQuery>())
{
return std::make_unique<InterpreterDropIndexQuery>(query, context);
}
else if (query->as<ASTBackupQuery>())
{
return std::make_unique<InterpreterBackupQuery>(query, context);

View File

@ -1242,8 +1242,12 @@ void InterpreterSelectQuery::executeImpl(QueryPlan & query_plan, std::optional<P
if (expressions.has_order_by)
executeOrder(query_plan, input_order_info_for_order);
if (expressions.has_order_by && query.limitLength())
executeDistinct(query_plan, false, expressions.selected_columns, true);
/// pre_distinct = false, because if we have limit and distinct,
/// we need to merge streams to one and calculate overall distinct.
/// Otherwise we can take several equal values from different streams
/// according to limit and skip some distinct values.
if (query.limitLength())
executeDistinct(query_plan, false, expressions.selected_columns, false);
if (expressions.hasLimitBy())
{

View File

@ -52,10 +52,6 @@ InterpreterSelectWithUnionQuery::InterpreterSelectWithUnionQuery(
if (!num_children)
throw Exception("Logical error: no children in ASTSelectWithUnionQuery", ErrorCodes::LOGICAL_ERROR);
/// This is required for UNION to match headers correctly.
if (num_children > 1)
options.reorderColumns();
/// Note that we pass 'required_result_column_names' to first SELECT.
/// And for the rest, we pass names at the corresponding positions of 'required_result_column_names' in the result of first SELECT,
/// because names could be different.
@ -163,7 +159,22 @@ InterpreterSelectWithUnionQuery::InterpreterSelectWithUnionQuery(
{
Blocks headers(num_children);
for (size_t query_num = 0; query_num < num_children; ++query_num)
{
headers[query_num] = nested_interpreters[query_num]->getSampleBlock();
const auto & current_required_result_column_names = required_result_column_names_for_other_selects[query_num];
if (!current_required_result_column_names.empty())
{
const auto & header_columns = headers[query_num].getNames();
if (current_required_result_column_names != header_columns)
{
throw Exception(ErrorCodes::LOGICAL_ERROR,
"Different order of columns in UNION subquery: {} and {}",
fmt::join(current_required_result_column_names, ", "),
fmt::join(header_columns, ", "));
}
}
}
result_header = getCommonHeaderForUnion(headers);
}

View File

@ -24,16 +24,17 @@ static TableLockHolder getLockForOrdinary(const StoragePtr & storage)
return storage->lockForShare(RWLockImpl::NO_QUERY, default_timeout);
}
MergeTreeTransaction::MergeTreeTransaction(CSN snapshot_, LocalTID local_tid_, UUID host_id)
MergeTreeTransaction::MergeTreeTransaction(CSN snapshot_, LocalTID local_tid_, UUID host_id, std::list<CSN>::iterator snapshot_it_)
: tid({snapshot_, local_tid_, host_id})
, snapshot(snapshot_)
, snapshot_in_use_it(snapshot_it_)
, csn(Tx::UnknownCSN)
{
}
void MergeTreeTransaction::setSnapshot(CSN new_snapshot)
{
snapshot = new_snapshot;
snapshot.store(new_snapshot, std::memory_order_relaxed);
}
MergeTreeTransaction::State MergeTreeTransaction::getState() const
@ -219,19 +220,31 @@ void MergeTreeTransaction::afterCommit(CSN assigned_csn) noexcept
/// It's not a problem if server crash before CSN is written, because we already have TID in data part and entry in the log.
[[maybe_unused]] CSN prev_value = csn.exchange(assigned_csn);
chassert(prev_value == Tx::CommittingCSN);
for (const auto & part : creating_parts)
DataPartsVector created_parts;
DataPartsVector removed_parts;
RunningMutationsList committed_mutations;
{
/// We don't really need mutex here, because no concurrent modifications of transaction object may happen after commit.
std::lock_guard lock{mutex};
created_parts = creating_parts;
removed_parts = removing_parts;
committed_mutations = mutations;
}
for (const auto & part : created_parts)
{
part->version.creation_csn.store(csn);
part->appendCSNToVersionMetadata(VersionMetadata::WhichCSN::CREATION);
}
for (const auto & part : removing_parts)
for (const auto & part : removed_parts)
{
part->version.removal_csn.store(csn);
part->appendCSNToVersionMetadata(VersionMetadata::WhichCSN::REMOVAL);
}
for (const auto & storage_and_mutation : mutations)
for (const auto & storage_and_mutation : committed_mutations)
storage_and_mutation.first->setMutationCSN(storage_and_mutation.second, csn);
}
@ -313,7 +326,7 @@ void MergeTreeTransaction::onException()
String MergeTreeTransaction::dumpDescription() const
{
String res = fmt::format("{} state: {}, snapshot: {}", tid, getState(), snapshot);
String res = fmt::format("{} state: {}, snapshot: {}", tid, getState(), getSnapshot());
if (isReadOnly())
{
@ -335,7 +348,7 @@ String MergeTreeTransaction::dumpDescription() const
{
String info = fmt::format("{} (created by {}, {})", part->name, part->version.getCreationTID(), part->version.creation_csn);
std::get<1>(storage_to_changes[&(part->storage)]).push_back(std::move(info));
chassert(!part->version.creation_csn || part->version.creation_csn <= snapshot);
chassert(!part->version.creation_csn || part->version.creation_csn <= getSnapshot());
}
for (const auto & mutation : mutations)

View File

@ -31,13 +31,13 @@ public:
ROLLED_BACK,
};
CSN getSnapshot() const { return snapshot; }
CSN getSnapshot() const { return snapshot.load(std::memory_order_relaxed); }
void setSnapshot(CSN new_snapshot);
State getState() const;
const TransactionID tid;
MergeTreeTransaction(CSN snapshot_, LocalTID local_tid_, UUID host_id);
MergeTreeTransaction(CSN snapshot_, LocalTID local_tid_, UUID host_id, std::list<CSN>::iterator snapshot_it_);
void addNewPart(const StoragePtr & storage, const DataPartPtr & new_part);
void removeOldPart(const StoragePtr & storage, const DataPartPtr & part_to_remove, const TransactionInfoContext & context);
@ -71,16 +71,16 @@ private:
Stopwatch elapsed;
/// Usually it's equal to tid.start_csn, but can be changed by SET SNAPSHOT query (for introspection purposes and time-traveling)
CSN snapshot;
std::list<CSN>::iterator snapshot_in_use_it;
std::atomic<CSN> snapshot;
const std::list<CSN>::iterator snapshot_in_use_it;
/// Lists of changes made by transaction
std::unordered_set<StoragePtr> storages;
std::vector<TableLockHolder> table_read_locks_for_ordinary_db;
DataPartsVector creating_parts;
DataPartsVector removing_parts;
std::unordered_set<StoragePtr> storages TSA_GUARDED_BY(mutex);
std::vector<TableLockHolder> table_read_locks_for_ordinary_db TSA_GUARDED_BY(mutex);
DataPartsVector creating_parts TSA_GUARDED_BY(mutex);
DataPartsVector removing_parts TSA_GUARDED_BY(mutex);
using RunningMutationsList = std::vector<std::pair<StoragePtr, String>>;
RunningMutationsList mutations;
RunningMutationsList mutations TSA_GUARDED_BY(mutex);
std::atomic<CSN> csn;
};

View File

@ -31,8 +31,6 @@ struct SelectQueryOptions
bool only_analyze = false;
bool modify_inplace = false;
bool remove_duplicates = false;
/// This is required for UNION to match headers correctly.
bool reorder_columns_as_required_header = false;
bool ignore_quota = false;
bool ignore_limits = false;
/// This flag is needed to analyze query ignoring table projections.
@ -99,12 +97,6 @@ struct SelectQueryOptions
return *this;
}
SelectQueryOptions & reorderColumns(bool value = true)
{
reorder_columns_as_required_header = value;
return *this;
}
SelectQueryOptions & noSubquery()
{
subquery_depth = 0;

View File

@ -43,16 +43,13 @@ catch (...)
TransactionLog::TransactionLog()
: log(&Poco::Logger::get("TransactionLog"))
: global_context(Context::getGlobalContextInstance())
, log(&Poco::Logger::get("TransactionLog"))
, zookeeper_path(global_context->getConfigRef().getString("transaction_log.zookeeper_path", "/clickhouse/txn"))
, zookeeper_path_log(zookeeper_path + "/log")
, fault_probability_before_commit(global_context->getConfigRef().getDouble("transaction_log.fault_probability_before_commit", 0))
, fault_probability_after_commit(global_context->getConfigRef().getDouble("transaction_log.fault_probability_after_commit", 0))
{
global_context = Context::getGlobalContextInstance();
global_context->checkTransactionsAreAllowed();
zookeeper_path = global_context->getConfigRef().getString("transaction_log.zookeeper_path", "/clickhouse/txn");
zookeeper_path_log = zookeeper_path + "/log";
fault_probability_before_commit = global_context->getConfigRef().getDouble("transaction_log.fault_probability_before_commit", 0);
fault_probability_after_commit = global_context->getConfigRef().getDouble("transaction_log.fault_probability_after_commit", 0);
loadLogFromZooKeeper();
updating_thread = ThreadFromGlobalPool(&TransactionLog::runUpdatingThread, this);
@ -128,7 +125,7 @@ void TransactionLog::loadEntries(Strings::const_iterator beg, Strings::const_ite
LOG_TRACE(log, "Loading {} entries from {}: {}..{}", entries_count, zookeeper_path_log, *beg, last_entry);
futures.reserve(entries_count);
for (auto it = beg; it != end; ++it)
futures.emplace_back(zookeeper->asyncGet(fs::path(zookeeper_path_log) / *it));
futures.emplace_back(TSA_READ_ONE_THREAD(zookeeper)->asyncGet(fs::path(zookeeper_path_log) / *it));
std::vector<std::pair<TIDHash, CSNEntry>> loaded;
loaded.reserve(entries_count);
@ -213,7 +210,7 @@ void TransactionLog::runUpdatingThread()
try
{
/// Do not wait if we have some transactions to finalize
if (unknown_state_list_loaded.empty())
if (TSA_READ_ONE_THREAD(unknown_state_list_loaded).empty())
log_updated_event->wait();
if (stop_flag.load())
@ -230,7 +227,7 @@ void TransactionLog::runUpdatingThread()
/// It's possible that we connected to different [Zoo]Keeper instance
/// so we may read a bit stale state.
zookeeper->sync(zookeeper_path_log);
TSA_READ_ONE_THREAD(zookeeper)->sync(zookeeper_path_log);
}
loadNewEntries();
@ -255,13 +252,13 @@ void TransactionLog::runUpdatingThread()
void TransactionLog::loadNewEntries()
{
Strings entries_list = zookeeper->getChildren(zookeeper_path_log, nullptr, log_updated_event);
Strings entries_list = TSA_READ_ONE_THREAD(zookeeper)->getChildren(zookeeper_path_log, nullptr, log_updated_event);
chassert(!entries_list.empty());
::sort(entries_list.begin(), entries_list.end());
auto it = std::upper_bound(entries_list.begin(), entries_list.end(), last_loaded_entry);
auto it = std::upper_bound(entries_list.begin(), entries_list.end(), TSA_READ_ONE_THREAD(last_loaded_entry));
loadEntries(it, entries_list.end());
chassert(last_loaded_entry == entries_list.back());
chassert(latest_snapshot == deserializeCSN(last_loaded_entry));
chassert(TSA_READ_ONE_THREAD(last_loaded_entry) == entries_list.back());
chassert(latest_snapshot == deserializeCSN(TSA_READ_ONE_THREAD(last_loaded_entry)));
latest_snapshot.notify_all();
}
@ -281,7 +278,7 @@ void TransactionLog::removeOldEntries()
/// TODO we will need a bit more complex logic for multiple hosts
Coordination::Stat stat;
CSN old_tail_ptr = deserializeCSN(zookeeper->get(zookeeper_path + "/tail_ptr", &stat));
CSN old_tail_ptr = deserializeCSN(TSA_READ_ONE_THREAD(zookeeper)->get(zookeeper_path + "/tail_ptr", &stat));
CSN new_tail_ptr = getOldestSnapshot();
if (new_tail_ptr < old_tail_ptr)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Got unexpected tail_ptr {}, oldest snapshot is {}, it's a bug", old_tail_ptr, new_tail_ptr);
@ -290,7 +287,7 @@ void TransactionLog::removeOldEntries()
/// (it's not supposed to fail with ZBADVERSION while there is only one host)
LOG_TRACE(log, "Updating tail_ptr from {} to {}", old_tail_ptr, new_tail_ptr);
zookeeper->set(zookeeper_path + "/tail_ptr", serializeCSN(new_tail_ptr), stat.version);
TSA_READ_ONE_THREAD(zookeeper)->set(zookeeper_path + "/tail_ptr", serializeCSN(new_tail_ptr), stat.version);
tail_ptr.store(new_tail_ptr);
/// Now we can find and remove old entries
@ -314,7 +311,7 @@ void TransactionLog::removeOldEntries()
continue;
LOG_TEST(log, "Removing entry {} -> {}", elem.second.tid, elem.second.csn);
auto code = zookeeper->tryRemove(zookeeper_path_log + "/" + serializeCSN(elem.second.csn));
auto code = TSA_READ_ONE_THREAD(zookeeper)->tryRemove(zookeeper_path_log + "/" + serializeCSN(elem.second.csn));
if (code == Coordination::Error::ZOK || code == Coordination::Error::ZNONODE)
removed_entries.push_back(elem.first);
}
@ -376,11 +373,11 @@ MergeTreeTransactionPtr TransactionLog::beginTransaction()
std::lock_guard lock{running_list_mutex};
CSN snapshot = latest_snapshot.load();
LocalTID ltid = 1 + local_tid_counter.fetch_add(1);
txn = std::make_shared<MergeTreeTransaction>(snapshot, ltid, ServerUUID::get());
auto snapshot_lock = snapshots_in_use.insert(snapshots_in_use.end(), snapshot);
txn = std::make_shared<MergeTreeTransaction>(snapshot, ltid, ServerUUID::get(), snapshot_lock);
bool inserted = running_list.try_emplace(txn->tid.getHash(), txn).second;
if (!inserted)
throw Exception(ErrorCodes::LOGICAL_ERROR, "I's a bug: TID {} {} exists", txn->tid.getHash(), txn->tid);
txn->snapshot_in_use_it = snapshots_in_use.insert(snapshots_in_use.end(), snapshot);
}
LOG_TEST(log, "Beginning transaction {} ({})", txn->tid, txn->tid.getHash());
@ -595,7 +592,7 @@ TransactionLog::TransactionsList TransactionLog::getTransactionsList() const
void TransactionLog::sync() const
{
Strings entries_list = zookeeper->getChildren(zookeeper_path_log);
Strings entries_list = getZooKeeper()->getChildren(zookeeper_path_log);
chassert(!entries_list.empty());
::sort(entries_list.begin(), entries_list.end());
CSN newest_csn = deserializeCSN(entries_list.back());

View File

@ -129,7 +129,7 @@ public:
void sync() const;
private:
void loadLogFromZooKeeper();
void loadLogFromZooKeeper() TSA_REQUIRES(mutex);
void runUpdatingThread();
void loadEntries(Strings::const_iterator beg, Strings::const_iterator end);
@ -149,8 +149,8 @@ private:
CSN getCSNImpl(const TIDHash & tid_hash) const;
ContextPtr global_context;
Poco::Logger * log;
const ContextPtr global_context;
Poco::Logger * const log;
/// The newest snapshot available for reading
std::atomic<CSN> latest_snapshot;
@ -167,24 +167,24 @@ private:
TransactionID tid;
};
using TIDMap = std::unordered_map<TIDHash, CSNEntry>;
TIDMap tid_to_csn;
TIDMap tid_to_csn TSA_GUARDED_BY(mutex);
mutable std::mutex running_list_mutex;
/// Transactions that are currently processed
TransactionsList running_list;
TransactionsList running_list TSA_GUARDED_BY(running_list_mutex);
/// If we lost connection on attempt to create csn- node then we don't know transaction's state.
using UnknownStateList = std::vector<std::pair<MergeTreeTransaction *, scope_guard>>;
UnknownStateList unknown_state_list;
UnknownStateList unknown_state_list_loaded;
UnknownStateList unknown_state_list TSA_GUARDED_BY(running_list_mutex);
UnknownStateList unknown_state_list_loaded TSA_GUARDED_BY(running_list_mutex);
/// Ordered list of snapshots that are currently used by some transactions. Needed for background cleanup.
std::list<CSN> snapshots_in_use;
std::list<CSN> snapshots_in_use TSA_GUARDED_BY(running_list_mutex);
ZooKeeperPtr zookeeper;
String zookeeper_path;
ZooKeeperPtr zookeeper TSA_GUARDED_BY(mutex);
const String zookeeper_path;
String zookeeper_path_log;
const String zookeeper_path_log;
/// Name of the newest entry that was loaded from log in ZK
String last_loaded_entry;
String last_loaded_entry TSA_GUARDED_BY(mutex);
/// The oldest CSN such that we store in log entries with TransactionIDs containing this CSN.
std::atomic<CSN> tail_ptr = Tx::UnknownCSN;
@ -193,8 +193,8 @@ private:
std::atomic_bool stop_flag = false;
ThreadFromGlobalPool updating_thread;
Float64 fault_probability_before_commit = 0;
Float64 fault_probability_after_commit = 0;
const Float64 fault_probability_before_commit = 0;
const Float64 fault_probability_after_commit = 0;
};
template <typename Derived>

View File

@ -429,21 +429,28 @@ void renameDuplicatedColumns(const ASTSelectQuery * select_query)
/// This is the case when we have DISTINCT or arrayJoin: we require more columns in SELECT even if we need less columns in result.
/// Also we have to remove duplicates in case of GLOBAL subqueries. Their results are placed into tables so duplicates are impossible.
/// Also remove all INTERPOLATE columns which are not in SELECT anymore.
void removeUnneededColumnsFromSelectClause(ASTSelectQuery * select_query, const Names & required_result_columns, bool remove_dups, bool reorder_columns_as_required_header)
void removeUnneededColumnsFromSelectClause(ASTSelectQuery * select_query, const Names & required_result_columns, bool remove_dups)
{
ASTs & elements = select_query->select()->children;
std::map<String, size_t> required_columns_with_duplicate_count;
std::unordered_map<String, size_t> required_columns_with_duplicate_count;
/// Order of output columns should match order in required_result_columns,
/// otherwise UNION queries may have incorrect header when subselect has duplicated columns.
///
/// NOTE: multimap is required since there can be duplicated column names.
std::unordered_multimap<String, size_t> output_columns_positions;
if (!required_result_columns.empty())
{
/// Some columns may be queried multiple times, like SELECT x, y, y FROM table.
for (const auto & name : required_result_columns)
for (size_t i = 0; i < required_result_columns.size(); ++i)
{
const auto & name = required_result_columns[i];
if (remove_dups)
required_columns_with_duplicate_count[name] = 1;
else
++required_columns_with_duplicate_count[name];
output_columns_positions.emplace(name, i);
}
}
else if (remove_dups)
@ -455,49 +462,44 @@ void removeUnneededColumnsFromSelectClause(ASTSelectQuery * select_query, const
else
return;
ASTs new_elements;
new_elements.reserve(elements.size());
ASTs new_elements(elements.size() + output_columns_positions.size());
size_t new_elements_size = 0;
NameSet remove_columns;
/// Resort columns according to required_result_columns.
if (reorder_columns_as_required_header && !required_result_columns.empty())
{
std::unordered_map<String, size_t> name_pos;
{
size_t pos = 0;
for (const auto & name : required_result_columns)
name_pos[name] = pos++;
}
::sort(elements.begin(), elements.end(), [&](const auto & lhs, const auto & rhs)
{
String lhs_name = lhs->getAliasOrColumnName();
String rhs_name = rhs->getAliasOrColumnName();
size_t lhs_pos = name_pos.size();
size_t rhs_pos = name_pos.size();
if (auto it = name_pos.find(lhs_name); it != name_pos.end())
lhs_pos = it->second;
if (auto it = name_pos.find(rhs_name); it != name_pos.end())
rhs_pos = it->second;
return lhs_pos < rhs_pos;
});
}
for (const auto & elem : elements)
{
String name = elem->getAliasOrColumnName();
/// Columns that are presented in output_columns_positions should
/// appears in the same order in the new_elements, hence default
/// result_index goes after all elements of output_columns_positions
/// (it is for columns that are not located in
/// output_columns_positions, i.e. untuple())
size_t result_index = output_columns_positions.size() + new_elements_size;
/// Note, order of duplicated columns is not important here (since they
/// are the same), only order for unique columns is important, so it is
/// fine to use multimap here.
if (auto it = output_columns_positions.find(name); it != output_columns_positions.end())
{
result_index = it->second;
output_columns_positions.erase(it);
}
auto it = required_columns_with_duplicate_count.find(name);
if (required_columns_with_duplicate_count.end() != it && it->second)
{
new_elements.push_back(elem);
new_elements[result_index] = elem;
--it->second;
++new_elements_size;
}
else if (select_query->distinct || hasArrayJoin(elem))
{
/// ARRAY JOIN cannot be optimized out since it may change number of rows,
/// so as DISTINCT.
new_elements.push_back(elem);
new_elements[result_index] = elem;
++new_elements_size;
}
else
{
@ -508,13 +510,20 @@ void removeUnneededColumnsFromSelectClause(ASTSelectQuery * select_query, const
/// Never remove untuple. It's result column may be in required columns.
/// It is not easy to analyze untuple here, because types were not calculated yet.
if (func && func->name == "untuple")
new_elements.push_back(elem);
{
new_elements[result_index] = elem;
++new_elements_size;
}
/// removing aggregation can change number of rows, so `count()` result in outer sub-query would be wrong
if (func && AggregateUtils::isAggregateFunction(*func) && !select_query->groupBy())
new_elements.push_back(elem);
{
new_elements[result_index] = elem;
++new_elements_size;
}
}
}
/// Remove empty nodes.
std::erase(new_elements, ASTPtr{});
if (select_query->interpolate())
{
@ -1193,7 +1202,6 @@ TreeRewriterResultPtr TreeRewriter::analyzeSelect(
size_t subquery_depth = select_options.subquery_depth;
bool remove_duplicates = select_options.remove_duplicates;
bool reorder_columns_as_required_header = select_options.reorder_columns_as_required_header;
const auto & settings = getContext()->getSettingsRef();
@ -1245,7 +1253,7 @@ TreeRewriterResultPtr TreeRewriter::analyzeSelect(
/// Leave all selected columns in case of DISTINCT; columns that contain arrayJoin function inside.
/// Must be after 'normalizeTree' (after expanding aliases, for aliases not get lost)
/// and before 'executeScalarSubqueries', 'analyzeAggregation', etc. to avoid excessive calculations.
removeUnneededColumnsFromSelectClause(select_query, required_result_columns, remove_duplicates, reorder_columns_as_required_header);
removeUnneededColumnsFromSelectClause(select_query, required_result_columns, remove_duplicates);
/// Executing scalar subqueries - replacing them with constant values.
executeScalarSubqueries(query, getContext(), subquery_depth, result.scalars, result.local_scalars, select_options.only_analyze);

View File

@ -9,6 +9,7 @@
#include <Interpreters/InterpreterCreateQuery.h>
#include <Interpreters/Context.h>
#include <Interpreters/loadMetadata.h>
#include <Interpreters/executeQuery.h>
#include <Databases/DatabaseOrdinary.h>
#include <Databases/TablesLoader.h>
@ -18,7 +19,6 @@
#include <Common/escapeForFileName.h>
#include <Common/typeid_cast.h>
#include <Common/StringUtils/StringUtils.h>
#include <filesystem>
#include <Common/logger_useful.h>
@ -27,6 +27,12 @@ namespace fs = std::filesystem;
namespace DB
{
namespace ErrorCodes
{
extern const int NOT_IMPLEMENTED;
extern const int LOGICAL_ERROR;
}
static void executeCreateQuery(
const String & query,
ContextMutablePtr context,
@ -71,12 +77,6 @@ static void loadDatabase(
ReadBufferFromFile in(database_metadata_file, 1024);
readStringUntilEOF(database_attach_query, in);
}
else if (fs::exists(fs::path(database_path)))
{
/// TODO Remove this code (it's required for compatibility with versions older than 20.7)
/// Database exists, but .sql file is absent. It's old-style Ordinary database (e.g. system or default)
database_attach_query = "ATTACH DATABASE " + backQuoteIfNeed(database) + " ENGINE = Ordinary";
}
else
{
/// It's first server run and we need create default and system databases.
@ -95,6 +95,15 @@ static void loadDatabase(
}
}
static void checkUnsupportedVersion(ContextMutablePtr context, const String & database_name)
{
/// Produce better exception message
String metadata_path = context->getPath() + "metadata/" + database_name;
if (fs::exists(fs::path(metadata_path)))
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Data directory for {} database exists, but metadata file does not. "
"Probably you are trying to upgrade from version older than 20.7. "
"If so, you should upgrade through intermediate version.", database_name);
}
void loadMetadata(ContextMutablePtr context, const String & default_database_name)
{
@ -118,43 +127,33 @@ void loadMetadata(ContextMutablePtr context, const String & default_database_nam
if (it->is_symlink())
continue;
const auto current_file = it->path().filename().string();
if (!it->is_directory())
{
/// TODO: DETACH DATABASE PERMANENTLY ?
if (fs::path(current_file).extension() == ".sql")
{
String db_name = fs::path(current_file).stem();
if (!isSystemOrInformationSchema(db_name))
databases.emplace(unescapeForFileName(db_name), fs::path(path) / db_name);
}
/// Temporary fails may be left from previous server runs.
if (fs::path(current_file).extension() == ".tmp")
{
LOG_WARNING(log, "Removing temporary file {}", it->path().string());
try
{
fs::remove(it->path());
}
catch (...)
{
/// It does not prevent server to startup.
tryLogCurrentException(log);
}
}
if (it->is_directory())
continue;
const auto current_file = it->path().filename().string();
/// TODO: DETACH DATABASE PERMANENTLY ?
if (fs::path(current_file).extension() == ".sql")
{
String db_name = fs::path(current_file).stem();
if (!isSystemOrInformationSchema(db_name))
databases.emplace(unescapeForFileName(db_name), fs::path(path) / db_name);
}
/// For '.svn', '.gitignore' directory and similar.
if (current_file.at(0) == '.')
continue;
if (isSystemOrInformationSchema(current_file))
continue;
databases.emplace(unescapeForFileName(current_file), it->path().string());
/// Temporary fails may be left from previous server runs.
if (fs::path(current_file).extension() == ".tmp")
{
LOG_WARNING(log, "Removing temporary file {}", it->path().string());
try
{
fs::remove(it->path());
}
catch (...)
{
/// It does not prevent server to startup.
tryLogCurrentException(log);
}
}
}
/// clickhouse-local creates DatabaseMemory as default database by itself
@ -162,7 +161,10 @@ void loadMetadata(ContextMutablePtr context, const String & default_database_nam
bool create_default_db_if_not_exists = !default_database_name.empty();
bool metadata_dir_for_default_db_already_exists = databases.contains(default_database_name);
if (create_default_db_if_not_exists && !metadata_dir_for_default_db_already_exists)
{
checkUnsupportedVersion(context, default_database_name);
databases.emplace(default_database_name, std::filesystem::path(path) / escapeForFileName(default_database_name));
}
TablesLoader::Databases loaded_databases;
for (const auto & [name, db_path] : databases)
@ -192,13 +194,14 @@ static void loadSystemDatabaseImpl(ContextMutablePtr context, const String & dat
{
String path = context->getPath() + "metadata/" + database_name;
String metadata_file = path + ".sql";
if (fs::exists(fs::path(path)) || fs::exists(fs::path(metadata_file)))
if (fs::exists(fs::path(metadata_file)))
{
/// 'has_force_restore_data_flag' is true, to not fail on loading query_log table, if it is corrupted.
loadDatabase(context, database_name, path, true);
}
else
{
checkUnsupportedVersion(context, database_name);
/// Initialize system database manually
String database_create_query = "CREATE DATABASE ";
database_create_query += database_name;
@ -208,6 +211,122 @@ static void loadSystemDatabaseImpl(ContextMutablePtr context, const String & dat
}
}
static void convertOrdinaryDatabaseToAtomic(ContextMutablePtr context, const DatabasePtr & database)
{
/// It's kind of C++ script that creates temporary database with Atomic engine,
/// moves all tables to it, drops old database and then renames new one to old name.
Poco::Logger * log = &Poco::Logger::get("loadMetadata");
String name = database->getDatabaseName();
String tmp_name = fmt::format(".tmp_convert.{}.{}", name, thread_local_rng());
String name_quoted = backQuoteIfNeed(name);
String tmp_name_quoted = backQuoteIfNeed(tmp_name);
LOG_INFO(log, "Will convert database {} from Ordinary to Atomic", name_quoted);
String create_database_query = fmt::format("CREATE DATABASE IF NOT EXISTS {}", tmp_name_quoted);
auto res = executeQuery(create_database_query, context, true);
executeTrivialBlockIO(res, context);
res = {};
auto tmp_database = DatabaseCatalog::instance().getDatabase(tmp_name);
assert(tmp_database->getEngineName() == "Atomic");
size_t num_tables = 0;
for (auto iterator = database->getTablesIterator(context); iterator->isValid(); iterator->next())
{
++num_tables;
auto id = iterator->table()->getStorageID();
id.database_name = tmp_name;
iterator->table()->checkTableCanBeRenamed(id);
}
LOG_INFO(log, "Will move {} tables to {}", num_tables, tmp_name_quoted);
for (auto iterator = database->getTablesIterator(context); iterator->isValid(); iterator->next())
{
auto id = iterator->table()->getStorageID();
String qualified_quoted_name = id.getFullTableName();
id.database_name = tmp_name;
String tmp_qualified_quoted_name = id.getFullTableName();
String move_table_query = fmt::format("RENAME TABLE {} TO {}", qualified_quoted_name, tmp_qualified_quoted_name);
res = executeQuery(move_table_query, context, true);
executeTrivialBlockIO(res, context);
res = {};
}
LOG_INFO(log, "Moved all tables from {} to {}", name_quoted, tmp_name_quoted);
if (!database->empty())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Database {} is not empty after moving tables", name_quoted);
String drop_query = fmt::format("DROP DATABASE {}", name_quoted);
res = executeQuery(drop_query, context, true);
executeTrivialBlockIO(res, context);
res = {};
String rename_query = fmt::format("RENAME DATABASE {} TO {}", tmp_name_quoted, name_quoted);
res = executeQuery(rename_query, context, true);
executeTrivialBlockIO(res, context);
LOG_INFO(log, "Finished database engine conversion of {}", name_quoted);
}
void maybeConvertOrdinaryDatabaseToAtomic(ContextMutablePtr context, const DatabasePtr & database)
{
if (database->getEngineName() != "Ordinary")
return;
if (context->getSettingsRef().allow_deprecated_database_ordinary)
return;
try
{
/// It's not quite correct to run DDL queries while database is not started up.
startupSystemTables();
auto local_context = Context::createCopy(context);
local_context->setSetting("check_table_dependencies", false);
convertOrdinaryDatabaseToAtomic(local_context, database);
auto new_database = DatabaseCatalog::instance().getDatabase(DatabaseCatalog::SYSTEM_DATABASE);
UUID db_uuid = new_database->getUUID();
std::vector<UUID> tables_uuids;
for (auto iterator = new_database->getTablesIterator(context); iterator->isValid(); iterator->next())
tables_uuids.push_back(iterator->uuid());
/// Reload database just in case (and update logger name)
String detach_query = fmt::format("DETACH DATABASE {}", backQuoteIfNeed(DatabaseCatalog::SYSTEM_DATABASE));
auto res = executeQuery(detach_query, context, true);
executeTrivialBlockIO(res, context);
res = {};
/// Unlock UUID mapping, because it will be locked again on database reload.
/// It's safe to do during metadata loading, because cleanup task is not started yet.
DatabaseCatalog::instance().removeUUIDMappingFinally(db_uuid);
for (const auto & uuid : tables_uuids)
DatabaseCatalog::instance().removeUUIDMappingFinally(uuid);
loadSystemDatabaseImpl(context, DatabaseCatalog::SYSTEM_DATABASE, "Atomic");
TablesLoader::Databases databases =
{
{DatabaseCatalog::SYSTEM_DATABASE, DatabaseCatalog::instance().getSystemDatabase()},
};
TablesLoader loader{context, databases, /* force_restore */ true, /* force_attach */ true};
loader.loadTables();
/// Will startup tables usual way
}
catch (Exception & e)
{
e.addMessage("While trying to convert {} to Atomic", database->getDatabaseName());
throw;
}
}
void startupSystemTables()
{

View File

@ -19,4 +19,8 @@ void loadMetadata(ContextMutablePtr context, const String & default_database_nam
/// so we startup system tables after all databases are loaded.
void startupSystemTables();
/// Converts database with Ordinary engine to Atomic. Does nothing if database is not Ordinary.
/// Can be called only during server startup when there are no queries from users.
void maybeConvertOrdinaryDatabaseToAtomic(ContextMutablePtr context, const DatabasePtr & database);
}

View File

@ -1,3 +1,4 @@
#include <Parsers/ASTIndexDeclaration.h>
#include <iomanip>
#include <IO/Operators.h>
#include <Parsers/ASTAlterQuery.h>
@ -556,6 +557,7 @@ void ASTAlterQuery::formatQueryImpl(const FormatSettings & settings, FormatState
frame.need_parens = false;
std::string indent_str = settings.one_line ? "" : std::string(4u * frame.indent, ' ');
settings.ostr << (settings.hilite ? hilite_keyword : "") << indent_str;
switch (alter_object)

View File

@ -0,0 +1,61 @@
#include <Common/quoteString.h>
#include <IO/Operators.h>
#include <Parsers/ASTCreateIndexQuery.h>
#include <Parsers/ASTIndexDeclaration.h>
namespace DB
{
/** Get the text that identifies this element. */
String ASTCreateIndexQuery::getID(char delim) const
{
return "CreateIndexQuery" + (delim + getDatabase()) + delim + getTable();
}
ASTPtr ASTCreateIndexQuery::clone() const
{
auto res = std::make_shared<ASTCreateIndexQuery>(*this);
res->children.clear();
res->index_name = index_name->clone();
res->children.push_back(res->index_name);
res->index_decl = index_decl->clone();
res->children.push_back(res->index_decl);
return res;
}
void ASTCreateIndexQuery::formatQueryImpl(const FormatSettings & settings, FormatState & state, FormatStateStacked frame) const
{
frame.need_parens = false;
std::string indent_str = settings.one_line ? "" : std::string(4u * frame.indent, ' ');
settings.ostr << (settings.hilite ? hilite_keyword : "") << indent_str;
settings.ostr << "CREATE INDEX " << (if_not_exists ? "IF NOT EXISTS " : "");
index_name->formatImpl(settings, state, frame);
settings.ostr << " ON ";
settings.ostr << (settings.hilite ? hilite_none : "");
if (table)
{
if (database)
{
settings.ostr << indent_str << backQuoteIfNeed(getDatabase());
settings.ostr << ".";
}
settings.ostr << indent_str << backQuoteIfNeed(getTable());
}
formatOnCluster(settings);
if (!cluster.empty())
settings.ostr << " ";
index_decl->formatImpl(settings, state, frame);
}
}

View File

@ -0,0 +1,39 @@
#pragma once
#include <Parsers/ASTQueryWithOnCluster.h>
#include <Parsers/ASTQueryWithTableAndOutput.h>
#include <Parsers/IAST.h>
namespace DB
{
/** CREATE INDEX [IF NOT EXISTS] name ON [db].name (expression) TYPE type GRANULARITY value
*/
class ASTCreateIndexQuery : public ASTQueryWithTableAndOutput, public ASTQueryWithOnCluster
{
public:
bool if_not_exists{false};
ASTPtr index_name;
/// Stores the IndexDeclaration here.
ASTPtr index_decl;
String getID(char delim) const override;
ASTPtr clone() const override;
ASTPtr getRewrittenASTWithoutOnCluster(const WithoutOnClusterASTRewriteParams & params) const override
{
return removeOnCluster<ASTCreateIndexQuery>(clone(), params.default_database);
}
virtual QueryKind getQueryKind() const override { return QueryKind::Create; }
protected:
void formatQueryImpl(const FormatSettings & settings, FormatState & state, FormatStateStacked frame) const override;
};
}

View File

@ -0,0 +1,63 @@
#include <Common/quoteString.h>
#include <IO/Operators.h>
#include <Parsers/ASTDropIndexQuery.h>
namespace DB
{
/** Get the text that identifies this element. */
String ASTDropIndexQuery::getID(char delim) const
{
return "CreateIndexQuery" + (delim + getDatabase()) + delim + getTable();
}
ASTPtr ASTDropIndexQuery::clone() const
{
auto res = std::make_shared<ASTDropIndexQuery>(*this);
res->children.clear();
res->index_name = index_name->clone();
res->children.push_back(res->index_name);
return res;
}
void ASTDropIndexQuery::formatQueryImpl(const FormatSettings & settings, FormatState & state, FormatStateStacked frame) const
{
frame.need_parens = false;
std::string indent_str = settings.one_line ? "" : std::string(4u * frame.indent, ' ');
settings.ostr << (settings.hilite ? hilite_keyword : "") << indent_str;
settings.ostr << "DROP INDEX " << (if_exists ? "IF EXISTS " : "");
index_name->formatImpl(settings, state, frame);
settings.ostr << " ON ";
settings.ostr << (settings.hilite ? hilite_none : "");
if (table)
{
if (database)
{
settings.ostr << indent_str << backQuoteIfNeed(getDatabase());
settings.ostr << ".";
}
settings.ostr << indent_str << backQuoteIfNeed(getTable());
}
formatOnCluster(settings);
}
ASTPtr ASTDropIndexQuery::convertToASTAlterCommand() const
{
auto command = std::make_shared<ASTAlterCommand>();
command->index = index_name->clone();
command->if_exists = if_exists;
command->type = ASTAlterCommand::DROP_INDEX;
return command;
}
}

View File

@ -0,0 +1,42 @@
#pragma once
#include <optional>
#include <Parsers/ASTAlterQuery.h>
#include <Parsers/ASTQueryWithOnCluster.h>
#include <Parsers/ASTQueryWithTableAndOutput.h>
#include <Parsers/IAST.h>
#include <Parsers/IParserBase.h>
namespace DB
{
/** DROP INDEX [IF EXISTS] name on [db].name
*/
class ASTDropIndexQuery : public ASTQueryWithTableAndOutput, public ASTQueryWithOnCluster
{
public:
bool if_exists{false};
ASTPtr index_name;
String getID(char delim) const override;
ASTPtr clone() const override;
ASTPtr getRewrittenASTWithoutOnCluster(const WithoutOnClusterASTRewriteParams & params) const override
{
return removeOnCluster<ASTDropIndexQuery>(clone(), params.default_database);
}
virtual QueryKind getQueryKind() const override { return QueryKind::Drop; }
/// Convert ASTDropIndexQuery to ASTAlterCommand.
ASTPtr convertToASTAlterCommand() const;
protected:
void formatQueryImpl(const FormatSettings & settings, FormatState & state, FormatStateStacked frame) const override;
};
}

View File

@ -72,8 +72,8 @@ void ASTDropQuery::formatQueryImpl(const FormatSettings & settings, FormatState
if (permanently)
settings.ostr << " PERMANENTLY";
if (no_delay)
settings.ostr << (settings.hilite ? hilite_keyword : "") << " NO DELAY" << (settings.hilite ? hilite_none : "");
if (sync)
settings.ostr << (settings.hilite ? hilite_keyword : "") << " SYNC" << (settings.hilite ? hilite_none : "");
}
}

View File

@ -31,7 +31,7 @@ public:
/// Same as above
bool is_view{false};
bool no_delay{false};
bool sync{false};
// We detach the object permanently, so it will not be reattached back during server restart.
bool permanently{false};

View File

@ -25,9 +25,19 @@ ASTPtr ASTIndexDeclaration::clone() const
void ASTIndexDeclaration::formatImpl(const FormatSettings & s, FormatState & state, FormatStateStacked frame) const
{
s.ostr << backQuoteIfNeed(name);
s.ostr << " ";
expr->formatImpl(s, state, frame);
if (from_create_index)
{
s.ostr << "(";
expr->formatImpl(s, state, frame);
s.ostr << ")";
}
else
{
s.ostr << backQuoteIfNeed(name);
s.ostr << " ";
expr->formatImpl(s, state, frame);
}
s.ostr << (s.hilite ? hilite_keyword : "") << " TYPE " << (s.hilite ? hilite_none : "");
type->formatImpl(s, state, frame);
s.ostr << (s.hilite ? hilite_keyword : "") << " GRANULARITY " << (s.hilite ? hilite_none : "");

View File

@ -16,6 +16,7 @@ public:
IAST * expr;
ASTFunction * type;
UInt64 granularity;
bool from_create_index = false;
/** Get the text that identifies this element. */
String getID(char) const override { return "Index"; }

View File

@ -840,7 +840,6 @@ bool ParserAlterCommandList::parseImpl(Pos & pos, ASTPtr & node, Expected & expe
return true;
}
bool ParserAlterQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
{
auto query = std::make_shared<ASTAlterQuery>();

View File

@ -0,0 +1,120 @@
#include <Parsers/ParserCreateIndexQuery.h>
#include <Parsers/ASTCreateIndexQuery.h>
#include <Parsers/ASTIdentifier.h>
#include <Parsers/ASTIndexDeclaration.h>
#include <Parsers/ASTLiteral.h>
#include <Parsers/ASTFunction.h>
#include <Parsers/CommonParsers.h>
#include <Parsers/ExpressionListParsers.h>
#include <Parsers/ParserDataType.h>
#include <Parsers/parseDatabaseAndTableName.h>
namespace DB
{
bool ParserCreateIndexDeclaration::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
{
ParserKeyword s_type("TYPE");
ParserKeyword s_granularity("GRANULARITY");
ParserDataType data_type_p;
ParserExpression expression_p;
ParserUnsignedInteger granularity_p;
ASTPtr expr;
ASTPtr type;
ASTPtr granularity;
/// Skip name parser for SQL-standard CREATE INDEX
if (!expression_p.parse(pos, expr, expected))
return false;
if (!s_type.ignore(pos, expected))
return false;
if (!data_type_p.parse(pos, type, expected))
return false;
if (!s_granularity.ignore(pos, expected))
return false;
if (!granularity_p.parse(pos, granularity, expected))
return false;
auto index = std::make_shared<ASTIndexDeclaration>();
index->from_create_index = true;
index->granularity = granularity->as<ASTLiteral &>().value.safeGet<UInt64>();
index->set(index->expr, expr);
index->set(index->type, type);
node = index;
return true;
}
bool ParserCreateIndexQuery::parseImpl(IParser::Pos & pos, ASTPtr & node, Expected & expected)
{
auto query = std::make_shared<ASTCreateIndexQuery>();
node = query;
ParserKeyword s_create("CREATE");
ParserKeyword s_index("INDEX");
ParserKeyword s_if_not_exists("IF NOT EXISTS");
ParserKeyword s_on("ON");
ParserIdentifier index_name_p;
ParserCreateIndexDeclaration parser_create_idx_decl;
ASTPtr index_name;
ASTPtr index_decl;
String cluster_str;
bool if_not_exists = false;
if (!s_create.ignore(pos, expected))
return false;
if (!s_index.ignore(pos, expected))
return false;
if (s_if_not_exists.ignore(pos, expected))
if_not_exists = true;
if (!index_name_p.parse(pos, index_name, expected))
return false;
/// ON [db.] table_name
if (!s_on.ignore(pos, expected))
return false;
if (!parseDatabaseAndTableAsAST(pos, expected, query->database, query->table))
return false;
/// [ON cluster_name]
if (s_on.ignore(pos, expected))
{
if (!ASTQueryWithOnCluster::parse(pos, cluster_str, expected))
return false;
}
if (!parser_create_idx_decl.parse(pos, index_decl, expected))
return false;
query->index_name = index_name;
query->children.push_back(index_name);
query->index_decl = index_decl;
query->children.push_back(index_decl);
query->if_not_exists = if_not_exists;
query->cluster = cluster_str;
if (query->database)
query->children.push_back(query->database);
if (query->table)
query->children.push_back(query->table);
return true;
}
}

View File

@ -0,0 +1,31 @@
#pragma once
#include <Parsers/IParserBase.h>
namespace DB
{
/** Query like this:
* CREATE INDEX [IF NOT EXISTS] name ON [db].name (expression) TYPE type GRANULARITY value
*/
class ParserCreateIndexQuery : public IParserBase
{
protected:
const char * getName() const override{ return "CREATE INDEX query"; }
bool parseImpl(Pos & pos, ASTPtr & node, Expected & expected) override;
};
/** Parser for index declaration in create index, where name is ignored. */
class ParserCreateIndexDeclaration : public IParserBase
{
public:
ParserCreateIndexDeclaration() {}
protected:
const char * getName() const override { return "index declaration in create index"; }
bool parseImpl(Pos & pos, ASTPtr & node, Expected & expected) override;
};
}

View File

@ -0,0 +1,67 @@
#include <Parsers/ASTDropIndexQuery.h>
#include <Parsers/ASTIdentifier.h>
#include <Parsers/CommonParsers.h>
#include <Parsers/ExpressionElementParsers.h>
#include <Parsers/ParserDropIndexQuery.h>
#include <Parsers/parseDatabaseAndTableName.h>
namespace DB
{
bool ParserDropIndexQuery::parseImpl(IParser::Pos & pos, ASTPtr & node, Expected & expected)
{
auto query = std::make_shared<ASTDropIndexQuery>();
node = query;
ParserKeyword s_drop("DROP");
ParserKeyword s_index("INDEX");
ParserKeyword s_on("ON");
ParserKeyword s_if_exists("IF EXISTS");
ParserIdentifier index_name_p;
String cluster_str;
bool if_exists = false;
if (!s_drop.ignore(pos, expected))
return false;
if (!s_index.ignore(pos, expected))
return false;
if (s_if_exists.ignore(pos, expected))
if_exists = true;
if (!index_name_p.parse(pos, query->index_name, expected))
return false;
/// ON [db.] table_name
if (!s_on.ignore(pos, expected))
return false;
if (!parseDatabaseAndTableAsAST(pos, expected, query->database, query->table))
return false;
/// [ON cluster_name]
if (s_on.ignore(pos, expected))
{
if (!ASTQueryWithOnCluster::parse(pos, cluster_str, expected))
return false;
query->cluster = std::move(cluster_str);
}
if (query->index_name)
query->children.push_back(query->index_name);
query->if_exists = if_exists;
if (query->database)
query->children.push_back(query->database);
if (query->table)
query->children.push_back(query->table);
return true;
}
}

View File

@ -0,0 +1,19 @@
#pragma once
#include <Parsers/IParserBase.h>
namespace DB
{
/** Query like this:
* DROP INDEX [IF EXISTS] name ON [db].name
*/
class ParserDropIndexQuery : public IParserBase
{
protected:
const char * getName() const override{ return "DROP INDEX query"; }
bool parseImpl(Pos & pos, ASTPtr & node, Expected & expected) override;
};
}

View File

@ -31,7 +31,7 @@ bool parseDropQuery(IParser::Pos & pos, ASTPtr & node, Expected & expected, cons
bool temporary = false;
bool is_dictionary = false;
bool is_view = false;
bool no_delay = false;
bool sync = false;
bool permanently = false;
if (s_database.ignore(pos, expected))
@ -83,7 +83,7 @@ bool parseDropQuery(IParser::Pos & pos, ASTPtr & node, Expected & expected, cons
/// actually for TRUNCATE NO DELAY / SYNC means nothing
if (s_no_delay.ignore(pos, expected) || s_sync.ignore(pos, expected))
no_delay = true;
sync = true;
auto query = std::make_shared<ASTDropQuery>();
node = query;
@ -93,7 +93,7 @@ bool parseDropQuery(IParser::Pos & pos, ASTPtr & node, Expected & expected, cons
query->temporary = temporary;
query->is_dictionary = is_dictionary;
query->is_view = is_view;
query->no_delay = no_delay;
query->sync = sync;
query->permanently = permanently;
query->database = database;
query->table = table;

View File

@ -2,7 +2,9 @@
#include <Parsers/ParserCreateFunctionQuery.h>
#include <Parsers/ParserBackupQuery.h>
#include <Parsers/ParserCreateQuery.h>
#include <Parsers/ParserCreateIndexQuery.h>
#include <Parsers/ParserDropFunctionQuery.h>
#include <Parsers/ParserDropIndexQuery.h>
#include <Parsers/ParserDropQuery.h>
#include <Parsers/ParserInsertQuery.h>
#include <Parsers/ParserOptimizeQuery.h>
@ -43,6 +45,8 @@ bool ParserQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
ParserCreateSettingsProfileQuery create_settings_profile_p;
ParserCreateFunctionQuery create_function_p;
ParserDropFunctionQuery drop_function_p;
ParserCreateIndexQuery create_index_p;
ParserDropIndexQuery drop_index_p;
ParserDropAccessEntityQuery drop_access_entity_p;
ParserGrantQuery grant_p;
ParserSetRoleQuery set_role_p;
@ -63,6 +67,8 @@ bool ParserQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
|| create_settings_profile_p.parse(pos, node, expected)
|| create_function_p.parse(pos, node, expected)
|| drop_function_p.parse(pos, node, expected)
|| create_index_p.parse(pos, node, expected)
|| drop_index_p.parse(pos, node, expected)
|| drop_access_entity_p.parse(pos, node, expected)
|| grant_p.parse(pos, node, expected)
|| external_ddl_p.parse(pos, node, expected)

View File

@ -602,8 +602,8 @@ void ArrowColumnToCHColumn::arrowColumnsToCHChunk(Chunk & res, NameToColumnPtr &
Columns columns_list;
UInt64 num_rows = name_to_column_ptr.begin()->second->length();
columns_list.reserve(header.rows());
std::unordered_map<String, BlockPtr> nested_tables;
columns_list.reserve(header.columns());
std::unordered_map<String, std::pair<BlockPtr, std::shared_ptr<NestedColumnExtractHelper>>> nested_tables;
bool skipped = false;
for (size_t column_i = 0, columns = header.columns(); column_i < columns; ++column_i)
{
@ -613,55 +613,57 @@ void ArrowColumnToCHColumn::arrowColumnsToCHChunk(Chunk & res, NameToColumnPtr &
if (case_insensitive_matching)
boost::to_lower(search_column_name);
bool read_from_nested = false;
String nested_table_name = Nested::extractTableName(header_column.name);
String search_nested_table_name = nested_table_name;
if (case_insensitive_matching)
boost::to_lower(search_nested_table_name);
ColumnWithTypeAndName column;
if (!name_to_column_ptr.contains(search_column_name))
{
bool read_from_nested = false;
/// Check if it's a column from nested table.
if (import_nested && name_to_column_ptr.contains(search_nested_table_name))
if (import_nested)
{
if (!nested_tables.contains(search_nested_table_name))
String nested_table_name = Nested::extractTableName(header_column.name);
String search_nested_table_name = nested_table_name;
if (case_insensitive_matching)
boost::to_lower(search_nested_table_name);
if (name_to_column_ptr.contains(search_nested_table_name))
{
std::shared_ptr<arrow::ChunkedArray> arrow_column = name_to_column_ptr[search_nested_table_name];
ColumnsWithTypeAndName cols
= {readColumnFromArrowColumn(arrow_column, nested_table_name, format_name, false, dictionary_values, true, true, false, skipped)};
Block block(cols);
nested_tables[search_nested_table_name] = std::make_shared<Block>(Nested::flatten(block));
if (!nested_tables.contains(search_nested_table_name))
{
std::shared_ptr<arrow::ChunkedArray> arrow_column = name_to_column_ptr[search_nested_table_name];
ColumnsWithTypeAndName cols = {readColumnFromArrowColumn(
arrow_column, nested_table_name, format_name, false, dictionary_values, true, true, false, skipped)};
BlockPtr block_ptr = std::make_shared<Block>(cols);
auto column_extractor = std::make_shared<NestedColumnExtractHelper>(*block_ptr, case_insensitive_matching);
nested_tables[search_nested_table_name] = {block_ptr, column_extractor};
}
auto nested_column = nested_tables[search_nested_table_name].second->extractColumn(search_column_name);
if (nested_column)
{
column = *nested_column;
if (case_insensitive_matching)
column.name = header_column.name;
read_from_nested = true;
}
}
read_from_nested = nested_tables[search_nested_table_name]->has(header_column.name, case_insensitive_matching);
}
if (!read_from_nested)
{
if (!allow_missing_columns)
throw Exception{ErrorCodes::THERE_IS_NO_COLUMN, "Column '{}' is not presented in input data.", header_column.name};
ColumnWithTypeAndName column;
column.name = header_column.name;
column.type = header_column.type;
column.column = header_column.column->cloneResized(num_rows);
columns_list.push_back(std::move(column.column));
continue;
else
{
column.name = header_column.name;
column.type = header_column.type;
column.column = header_column.column->cloneResized(num_rows);
columns_list.push_back(std::move(column.column));
continue;
}
}
}
ColumnWithTypeAndName column;
if (read_from_nested)
{
column = nested_tables[search_nested_table_name]->getByName(header_column.name, case_insensitive_matching);
if (case_insensitive_matching)
column.name = header_column.name;
}
else
{
auto arrow_column = name_to_column_ptr[search_column_name];
column = readColumnFromArrowColumn(arrow_column, header_column.name, format_name, false, dictionary_values, true, true, false, skipped);
column = readColumnFromArrowColumn(
arrow_column, header_column.name, format_name, false, dictionary_values, true, true, false, skipped);
}
try
@ -689,23 +691,17 @@ std::vector<size_t> ArrowColumnToCHColumn::getMissingColumns(const arrow::Schema
{
std::vector<size_t> missing_columns;
auto block_from_arrow = arrowSchemaToCHHeader(schema, format_name, false, &header, case_insensitive_matching);
auto flatten_block_from_arrow = Nested::flatten(block_from_arrow);
NestedColumnExtractHelper nested_columns_extractor(block_from_arrow, case_insensitive_matching);
for (size_t i = 0, columns = header.columns(); i < columns; ++i)
{
const auto & header_column = header.getByPosition(i);
bool read_from_nested = false;
String nested_table_name = Nested::extractTableName(header_column.name);
if (!block_from_arrow.has(header_column.name, case_insensitive_matching))
{
if (import_nested && block_from_arrow.has(nested_table_name, case_insensitive_matching))
read_from_nested = flatten_block_from_arrow.has(header_column.name, case_insensitive_matching);
if (!read_from_nested)
if (!import_nested || !nested_columns_extractor.extractColumn(header_column.name))
{
if (!allow_missing_columns)
throw Exception{ErrorCodes::THERE_IS_NO_COLUMN, "Column '{}' is not presented in input data.", header_column.name};
missing_columns.push_back(i);
}
}

View File

@ -13,10 +13,20 @@ DistinctSortedTransform::DistinctSortedTransform(
: ISimpleTransform(header_, header_, true)
, header(std::move(header_))
, description(std::move(sort_description))
, columns_names(columns)
, column_names(columns)
, limit_hint(limit_hint_)
, set_size_limits(set_size_limits_)
{
/// pre-calculate column positions to use during chunk transformation
const size_t num_columns = column_names.empty() ? header.columns() : column_names.size();
column_positions.reserve(num_columns);
for (size_t i = 0; i < num_columns; ++i)
{
auto pos = column_names.empty() ? i : header.getPositionByName(column_names[i]);
const auto & col = header.getByPosition(pos).column;
if (col && !isColumnConst(*col))
column_positions.emplace_back(pos);
}
}
void DistinctSortedTransform::transform(Chunk & chunk)
@ -119,24 +129,13 @@ bool DistinctSortedTransform::buildFilter(
ColumnRawPtrs DistinctSortedTransform::getKeyColumns(const Chunk & chunk) const
{
size_t columns = columns_names.empty() ? chunk.getNumColumns() : columns_names.size();
ColumnRawPtrs column_ptrs;
column_ptrs.reserve(columns);
for (size_t i = 0; i < columns; ++i)
column_ptrs.reserve(column_positions.size());
for (const auto pos : column_positions)
{
auto pos = i;
if (!columns_names.empty())
pos = input.getHeader().getPositionByName(columns_names[i]);
const auto & column = chunk.getColumns()[pos];
/// Ignore all constant columns.
if (!isColumnConst(*column))
column_ptrs.emplace_back(column.get());
column_ptrs.emplace_back(column.get());
}
return column_ptrs;
}

View File

@ -3,6 +3,7 @@
#include <Processors/ISimpleTransform.h>
#include <Interpreters/SetVariants.h>
#include <Core/SortDescription.h>
#include <Core/ColumnNumbers.h>
namespace DB
@ -57,7 +58,8 @@ private:
};
PreviousChunk prev_chunk;
Names columns_names;
Names column_names;
ColumnNumbers column_positions;
ClearableSetVariants data;
Sizes key_sizes;
UInt64 limit_hint;

View File

@ -37,6 +37,7 @@
#include <Storages/Hive/StorageHiveMetadata.h>
#include <Storages/MergeTree/KeyCondition.h>
#include <Storages/StorageFactory.h>
#include <DataTypes/NestedUtils.h>
namespace DB
{
@ -766,8 +767,20 @@ Pipe StorageHive::read(
sources_info->hive_metastore_client = hive_metastore_client;
sources_info->partition_name_types = partition_name_types;
Block sample_block;
const auto header_block = storage_snapshot->metadata->getSampleBlock();
bool support_subset_columns = supportsSubcolumns();
auto settings = context_->getSettingsRef();
auto case_insensitive_matching = [&]() -> bool
{
if (format_name == "Parquet")
return settings.input_format_parquet_case_insensitive_column_matching;
else if (format_name == "ORC")
return settings.input_format_orc_case_insensitive_column_matching;
return false;
};
Block sample_block;
NestedColumnExtractHelper nested_columns_extractor(header_block, case_insensitive_matching());
for (const auto & column : column_names)
{
if (header_block.has(column))
@ -775,7 +788,15 @@ Pipe StorageHive::read(
sample_block.insert(header_block.getByName(column));
continue;
}
else if (support_subset_columns)
{
auto subset_column = nested_columns_extractor.extractColumn(column);
if (subset_column)
{
sample_block.insert(std::move(*subset_column));
continue;
}
}
if (column == "_path")
sources_info->need_path_column = true;
if (column == "_file")

View File

@ -42,6 +42,9 @@ public:
String getName() const override { return "Hive"; }
bool supportsIndexForIn() const override { return true; }
bool supportsSubcolumns() const override { return true; }
bool mayBenefitFromIndexForIn(
const ASTPtr & /* left_in_operand */,
ContextPtr /* query_context */,

View File

@ -396,7 +396,7 @@ public:
*/
virtual void drop() {}
virtual void dropInnerTableIfAny(bool /* no_delay */, ContextPtr /* context */) {}
virtual void dropInnerTableIfAny(bool /* sync */, ContextPtr /* context */) {}
/** Clear the table data and leave it empty.
* Must be called under exclusive lock (lockExclusively).

View File

@ -1324,20 +1324,29 @@ void MergeTreeData::loadDataParts(bool skip_sanity_checks)
auto deactivate_part = [&] (DataPartIteratorByStateAndInfo it)
{
const DataPartPtr & part = *it;
(*it)->remove_time.store((*it)->modification_time, std::memory_order_relaxed);
auto creation_csn = (*it)->version.creation_csn.load(std::memory_order_relaxed);
if (creation_csn != Tx::RolledBackCSN && creation_csn != Tx::PrehistoricCSN && !(*it)->version.isRemovalTIDLocked())
part->remove_time.store(part->modification_time, std::memory_order_relaxed);
auto creation_csn = part->version.creation_csn.load(std::memory_order_relaxed);
if (creation_csn != Tx::RolledBackCSN && creation_csn != Tx::PrehistoricCSN && !part->version.isRemovalTIDLocked())
{
/// It's possible that covering part was created without transaction,
/// but if covered part was created with transaction (i.e. creation_tid is not prehistoric),
/// then it must have removal tid in metadata file.
throw Exception(ErrorCodes::LOGICAL_ERROR, "Data part {} is Outdated and has creation TID {} and CSN {}, "
"but does not have removal tid. It's a bug or a result of manual intervention.",
(*it)->name, (*it)->version.creation_tid, creation_csn);
part->name, part->version.creation_tid, creation_csn);
}
modifyPartState(it, DataPartState::Outdated);
removePartContributionToDataVolume(*it);
removePartContributionToDataVolume(part);
/// Explicitly set removal_tid_lock for parts w/o transaction (i.e. w/o txn_version.txt)
/// to avoid keeping part forever (see VersionMetadata::canBeRemoved())
if (!part->version.isRemovalTIDLocked())
{
TransactionInfoContext transaction_context{getStorageID(), part->name};
part->version.lockRemovalTID(Tx::PrehistoricTID, transaction_context);
}
};
/// All parts are in "Active" state after loading

View File

@ -304,12 +304,22 @@ static StoragePtr create(const StorageFactory::Arguments & args)
arg_idx, e.message(), getMergeTreeVerboseHelp(is_extended_storage_def));
}
}
else if (!args.attach && !args.getLocalContext()->getSettingsRef().allow_deprecated_syntax_for_merge_tree)
{
throw Exception(ErrorCodes::BAD_ARGUMENTS, "This syntax for *MergeTree engine is deprecated. "
"Use extended storage definition syntax with ORDER BY/PRIMARY KEY clause."
"See also allow_deprecated_syntax_for_merge_tree setting.");
}
/// For Replicated.
String zookeeper_path;
String replica_name;
StorageReplicatedMergeTree::RenamingRestrictions renaming_restrictions = StorageReplicatedMergeTree::RenamingRestrictions::ALLOW_ANY;
bool is_on_cluster = args.getLocalContext()->getClientInfo().query_kind == ClientInfo::QueryKind::SECONDARY_QUERY;
bool is_replicated_database = args.getLocalContext()->getClientInfo().query_kind == ClientInfo::QueryKind::SECONDARY_QUERY &&
DatabaseCatalog::instance().getDatabase(args.table_id.database_name)->getEngineName() == "Replicated";
if (replicated)
{
bool has_arguments = arg_num + 2 <= arg_cnt;
@ -372,17 +382,11 @@ static StoragePtr create(const StorageFactory::Arguments & args)
throw Exception("Expected two string literal arguments: zookeeper_path and replica_name", ErrorCodes::BAD_ARGUMENTS);
/// Allow implicit {uuid} macros only for zookeeper_path in ON CLUSTER queries
bool is_on_cluster = args.getLocalContext()->getClientInfo().query_kind == ClientInfo::QueryKind::SECONDARY_QUERY;
bool is_replicated_database = args.getLocalContext()->getClientInfo().query_kind == ClientInfo::QueryKind::SECONDARY_QUERY &&
DatabaseCatalog::instance().getDatabase(args.table_id.database_name)->getEngineName() == "Replicated";
bool allow_uuid_macro = is_on_cluster || is_replicated_database || args.query.attach;
/// Unfold {database} and {table} macro on table creation, so table can be renamed.
if (!args.attach)
{
if (is_replicated_database && !is_extended_storage_def)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Old syntax is not allowed for ReplicatedMergeTree tables in Replicated databases");
Macros::MacroExpansionInfo info;
/// NOTE: it's not recursive
info.expand_special_macros_only = true;

View File

@ -240,7 +240,7 @@ void StorageMaterializedPostgreSQL::shutdown()
}
void StorageMaterializedPostgreSQL::dropInnerTableIfAny(bool no_delay, ContextPtr local_context)
void StorageMaterializedPostgreSQL::dropInnerTableIfAny(bool sync, ContextPtr local_context)
{
/// If it is a table with database engine MaterializedPostgreSQL - return, because delition of
/// internal tables is managed there.
@ -252,7 +252,7 @@ void StorageMaterializedPostgreSQL::dropInnerTableIfAny(bool no_delay, ContextPt
auto nested_table = tryGetNested() != nullptr;
if (nested_table)
InterpreterDropQuery::executeDropQuery(ASTDropQuery::Kind::Drop, getContext(), local_context, getNestedStorageID(), no_delay);
InterpreterDropQuery::executeDropQuery(ASTDropQuery::Kind::Drop, getContext(), local_context, getNestedStorageID(), sync);
}

View File

@ -84,7 +84,7 @@ public:
void shutdown() override;
/// Used only for single MaterializedPostgreSQL storage.
void dropInnerTableIfAny(bool no_delay, ContextPtr local_context) override;
void dropInnerTableIfAny(bool sync, ContextPtr local_context) override;
NamesAndTypesList getVirtuals() const override;

View File

@ -142,7 +142,7 @@ Chunk RabbitMQSource::generateImpl()
while (true)
{
if (buffer->eof())
if (buffer->queueEmpty())
break;
auto new_rows = executor.execute();

View File

@ -47,6 +47,12 @@ ReadBufferFromRabbitMQConsumer::~ReadBufferFromRabbitMQConsumer()
}
void ReadBufferFromRabbitMQConsumer::closeChannel()
{
if (consumer_channel)
consumer_channel->close();
}
void ReadBufferFromRabbitMQConsumer::subscribe()
{
for (const auto & queue_name : queues)

View File

@ -3,18 +3,24 @@
#include <Core/Names.h>
#include <base/types.h>
#include <IO/ReadBuffer.h>
#include <amqpcpp.h>
#include <Storages/RabbitMQ/RabbitMQHandler.h>
#include <Common/ConcurrentBoundedQueue.h>
namespace Poco
{
class Logger;
class Logger;
}
namespace AMQP
{
class TcpChannel;
}
namespace DB
{
class RabbitMQHandler;
using ChannelPtr = std::unique_ptr<AMQP::TcpChannel>;
class ReadBufferFromRabbitMQConsumer : public ReadBuffer
{
@ -52,11 +58,7 @@ public:
ChannelPtr & getChannel() { return consumer_channel; }
void setupChannel();
bool needChannelUpdate();
void closeChannel()
{
if (consumer_channel)
consumer_channel->close();
}
void closeChannel();
void updateQueues(std::vector<String> & queues_) { queues = queues_; }
size_t queuesCount() { return queues.size(); }

View File

@ -215,10 +215,10 @@ void StorageMaterializedView::drop()
dropInnerTableIfAny(true, getContext());
}
void StorageMaterializedView::dropInnerTableIfAny(bool no_delay, ContextPtr local_context)
void StorageMaterializedView::dropInnerTableIfAny(bool sync, ContextPtr local_context)
{
if (has_inner_table && tryGetTargetTable())
InterpreterDropQuery::executeDropQuery(ASTDropQuery::Kind::Drop, getContext(), local_context, target_table_id, no_delay);
InterpreterDropQuery::executeDropQuery(ASTDropQuery::Kind::Drop, getContext(), local_context, target_table_id, sync);
}
void StorageMaterializedView::truncate(const ASTPtr &, const StorageMetadataPtr &, ContextPtr local_context, TableExclusiveLockHolder &)

View File

@ -42,7 +42,7 @@ public:
SinkToStoragePtr write(const ASTPtr & query, const StorageMetadataPtr & /*metadata_snapshot*/, ContextPtr context) override;
void drop() override;
void dropInnerTableIfAny(bool no_delay, ContextPtr local_context) override;
void dropInnerTableIfAny(bool sync, ContextPtr local_context) override;
void truncate(const ASTPtr &, const StorageMetadataPtr &, ContextPtr, TableExclusiveLockHolder &) override;

View File

@ -598,6 +598,9 @@ void StorageReplicatedMergeTree::createNewZooKeeperNodes()
auto zookeeper = getZooKeeper();
std::vector<zkutil::ZooKeeper::FutureCreate> futures;
/// We need to confirm /quorum exists here although it's called under createTableIfNotExists because in older CH releases (pre 22.4)
/// it was created here, so if metadata creation is done by an older replica the node might not exists when reaching this call
futures.push_back(zookeeper->asyncTryCreateNoThrow(zookeeper_path + "/quorum", String(), zkutil::CreateMode::Persistent));
futures.push_back(zookeeper->asyncTryCreateNoThrow(zookeeper_path + "/quorum/parallel", String(), zkutil::CreateMode::Persistent));
/// Nodes for remote fs zero-copy replication

View File

@ -1606,7 +1606,7 @@ void StorageWindowView::drop()
dropInnerTableIfAny(true, getContext());
}
void StorageWindowView::dropInnerTableIfAny(bool no_delay, ContextPtr local_context)
void StorageWindowView::dropInnerTableIfAny(bool sync, ContextPtr local_context)
{
if (!std::exchange(has_inner_table, false))
return;
@ -1614,10 +1614,10 @@ void StorageWindowView::dropInnerTableIfAny(bool no_delay, ContextPtr local_cont
try
{
InterpreterDropQuery::executeDropQuery(
ASTDropQuery::Kind::Drop, getContext(), local_context, inner_table_id, no_delay);
ASTDropQuery::Kind::Drop, getContext(), local_context, inner_table_id, sync);
if (has_inner_target_table)
InterpreterDropQuery::executeDropQuery(ASTDropQuery::Kind::Drop, getContext(), local_context, target_table_id, no_delay);
InterpreterDropQuery::executeDropQuery(ASTDropQuery::Kind::Drop, getContext(), local_context, target_table_id, sync);
}
catch (...)
{

View File

@ -120,7 +120,7 @@ public:
void checkTableCanBeDropped() const override;
void dropInnerTableIfAny(bool no_delay, ContextPtr context) override;
void dropInnerTableIfAny(bool sync, ContextPtr context) override;
void drop() override;

View File

@ -225,7 +225,7 @@ if __name__ == "__main__":
output_path_log = os.path.join(result_path, "main_script_log.txt")
runner_path = os.path.join(repo_path, "tests/integration", "ci-runner.py")
run_command = f"sudo -E {runner_path} | tee {output_path_log}"
run_command = f"sudo -E {runner_path}"
logging.info("Going to run command: `%s`", run_command)
logging.info(
"ENV parameters for runner:\n%s",

View File

@ -1490,9 +1490,9 @@ def collect_build_flags(args):
result.append(BuildFlags.RELEASE)
value = clickhouse_execute(
args, "SELECT value FROM system.settings WHERE name = 'default_database_engine'"
args, "SELECT value FROM system.settings WHERE name = 'allow_deprecated_database_ordinary'"
)
if value == b"Ordinary":
if value == b"1":
result.append(BuildFlags.ORDINARY_DATABASE)
value = int(

View File

@ -1,7 +1,7 @@
<clickhouse>
<profiles>
<default>
<default_database_engine>Ordinary</default_database_engine>
<allow_deprecated_database_ordinary>1</allow_deprecated_database_ordinary>
</default>
</profiles>
</clickhouse>

Some files were not shown because too many files have changed in this diff Show More