Merge branch 'master' into readonly-settings-allow

This commit is contained in:
Sergei Trifonov 2022-09-19 21:00:12 +02:00 committed by GitHub
commit 7902922ae4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
57 changed files with 599 additions and 102 deletions

View File

@ -3433,7 +3433,7 @@ Possible values:
- 0 — Disabled.
- 1 — Enabled.
Default value: 0.
Default value: 1.
## input_format_with_names_use_header {#input_format_with_names_use_header}

View File

@ -155,7 +155,7 @@ Example of configuration for versions earlier than 22.8:
<endpoint>...</endpoint>
... s3 configuration ...
<data_cache_enabled>1</data_cache_enabled>
<data_cache_size>10000000</data_cache_size>
<data_cache_max_size>10000000</data_cache_max_size>
</s3>
</disks>
<policies>

View File

@ -64,6 +64,11 @@ public:
return nested_func->isVersioned();
}
size_t getVersionFromRevision(size_t revision) const override
{
return nested_func->getVersionFromRevision(revision);
}
size_t getDefaultVersion() const override
{
return nested_func->getDefaultVersion();
@ -79,6 +84,11 @@ public:
nested_func->destroy(place);
}
void destroyUpToState(AggregateDataPtr __restrict place) const noexcept override
{
nested_func->destroyUpToState(place);
}
bool hasTrivialDestructor() const override
{
return nested_func->hasTrivialDestructor();

View File

@ -225,6 +225,12 @@ public:
nested_func->destroy(getNestedPlace(place));
}
void destroyUpToState(AggregateDataPtr __restrict place) const noexcept override
{
this->data(place).~Data();
nested_func->destroyUpToState(getNestedPlace(place));
}
String getName() const override
{
return nested_func->getName() + "Distinct";
@ -245,6 +251,21 @@ public:
return nested_func->isState();
}
bool isVersioned() const override
{
return nested_func->isVersioned();
}
size_t getVersionFromRevision(size_t revision) const override
{
return nested_func->getVersionFromRevision(revision);
}
size_t getDefaultVersion() const override
{
return nested_func->getDefaultVersion();
}
AggregateFunctionPtr getNestedFunction() const override { return nested_func; }
};

View File

@ -66,6 +66,7 @@ private:
if (old_size < new_size)
{
char * old_state = state.array_of_aggregate_datas;
char * new_state = arena.alignedAlloc(
new_size * nested_size_of_data,
nested_func->alignOfData());
@ -134,23 +135,43 @@ public:
return nested_func->isVersioned();
}
size_t getVersionFromRevision(size_t revision) const override
{
return nested_func->getVersionFromRevision(revision);
}
size_t getDefaultVersion() const override
{
return nested_func->getDefaultVersion();
}
void destroy(AggregateDataPtr __restrict place) const noexcept override
template <bool up_to_state>
void destroyImpl(AggregateDataPtr __restrict place) const noexcept
{
AggregateFunctionForEachData & state = data(place);
char * nested_state = state.array_of_aggregate_datas;
for (size_t i = 0; i < state.dynamic_array_size; ++i)
{
nested_func->destroy(nested_state);
if constexpr (up_to_state)
nested_func->destroyUpToState(nested_state);
else
nested_func->destroy(nested_state);
nested_state += nested_size_of_data;
}
}
void destroy(AggregateDataPtr __restrict place) const noexcept override
{
destroyImpl<false>(place);
}
void destroyUpToState(AggregateDataPtr __restrict place) const noexcept override
{
destroyImpl<true>(place);
}
bool hasTrivialDestructor() const override
{
return nested_func->hasTrivialDestructor();

View File

@ -71,6 +71,11 @@ public:
return nested_func->isVersioned();
}
size_t getVersionFromRevision(size_t revision) const override
{
return nested_func->getVersionFromRevision(revision);
}
size_t getDefaultVersion() const override
{
return nested_func->getDefaultVersion();
@ -86,6 +91,11 @@ public:
nested_func->destroy(place);
}
void destroyUpToState(AggregateDataPtr __restrict place) const noexcept override
{
nested_func->destroyUpToState(place);
}
bool hasTrivialDestructor() const override
{
return nested_func->hasTrivialDestructor();

View File

@ -84,6 +84,26 @@ private:
using Base = IAggregateFunctionDataHelper<Data, AggregateFunctionMap<KeyType>>;
public:
bool isState() const override
{
return nested_func->isState();
}
bool isVersioned() const override
{
return nested_func->isVersioned();
}
size_t getVersionFromRevision(size_t revision) const override
{
return nested_func->getVersionFromRevision(revision);
}
size_t getDefaultVersion() const override
{
return nested_func->getDefaultVersion();
}
AggregateFunctionMap(AggregateFunctionPtr nested, const DataTypes & types) : Base(types, nested->getParameters()), nested_func(nested)
{
if (types.empty())
@ -187,6 +207,32 @@ public:
}
}
template <bool up_to_state>
void destroyImpl(AggregateDataPtr __restrict place) const noexcept
{
AggregateFunctionMapCombinatorData<KeyType> & state = Base::data(place);
for (const auto & [key, nested_place] : state.merged_maps)
{
if constexpr (up_to_state)
nested_func->destroyUpToState(nested_place);
else
nested_func->destroy(nested_place);
}
state.~Data();
}
void destroy(AggregateDataPtr __restrict place) const noexcept override
{
destroyImpl<false>(place);
}
void destroyUpToState(AggregateDataPtr __restrict place) const noexcept override
{
destroyImpl<true>(place);
}
void serialize(ConstAggregateDataPtr __restrict place, WriteBuffer & buf, std::optional<size_t> /* version */) const override
{
auto & merged_maps = this->data(place).merged_maps;

View File

@ -80,6 +80,11 @@ public:
nested_func->destroy(place);
}
void destroyUpToState(AggregateDataPtr __restrict place) const noexcept override
{
nested_func->destroyUpToState(place);
}
bool hasTrivialDestructor() const override
{
return nested_func->hasTrivialDestructor();
@ -126,6 +131,11 @@ public:
}
AggregateFunctionPtr getNestedFunction() const override { return nested_func; }
bool isState() const override
{
return nested_func->isState();
}
};
}

View File

@ -114,6 +114,11 @@ public:
nested_function->destroy(nestedPlace(place));
}
void destroyUpToState(AggregateDataPtr __restrict place) const noexcept override
{
nested_function->destroyUpToState(nestedPlace(place));
}
bool hasTrivialDestructor() const override
{
return nested_function->hasTrivialDestructor();
@ -189,6 +194,21 @@ public:
return nested_function->isState();
}
bool isVersioned() const override
{
return nested_function->isVersioned();
}
size_t getVersionFromRevision(size_t revision) const override
{
return nested_function->getVersionFromRevision(revision);
}
size_t getDefaultVersion() const override
{
return nested_function->getDefaultVersion();
}
AggregateFunctionPtr getNestedFunction() const override { return nested_function; }
#if USE_EMBEDDED_COMPILER

View File

@ -98,6 +98,11 @@ public:
nested_function->destroy(place);
}
void destroyUpToState(AggregateDataPtr __restrict place) const noexcept override
{
nested_function->destroyUpToState(place);
}
void add(
AggregateDataPtr __restrict place,
const IColumn ** columns,

View File

@ -91,6 +91,21 @@ public:
return nested_function->isState();
}
bool isVersioned() const override
{
return nested_function->isVersioned();
}
size_t getVersionFromRevision(size_t revision) const override
{
return nested_function->getVersionFromRevision(revision);
}
size_t getDefaultVersion() const override
{
return nested_function->getDefaultVersion();
}
bool allocatesMemoryInArena() const override
{
return nested_function->allocatesMemoryInArena();
@ -134,6 +149,12 @@ public:
nested_function->destroy(place + i * size_of_data);
}
void destroyUpToState(AggregateDataPtr __restrict place) const noexcept override
{
for (size_t i = 0; i < total; ++i)
nested_function->destroyUpToState(place + i * size_of_data);
}
void add(AggregateDataPtr __restrict place, const IColumn ** columns, size_t row_num, Arena * arena) const override
{
Key key;

View File

@ -56,10 +56,22 @@ public:
return nested_func->getDefaultVersion();
}
bool isState() const override
{
return nested_func->isState();
}
size_t getVersionFromRevision(size_t revision) const override
{
return nested_func->getVersionFromRevision(revision);
}
void create(AggregateDataPtr __restrict place) const override { nested_func->create(place); }
void destroy(AggregateDataPtr __restrict place) const noexcept override { nested_func->destroy(place); }
void destroyUpToState(AggregateDataPtr __restrict place) const noexcept override { nested_func->destroyUpToState(place); }
bool hasTrivialDestructor() const override { return nested_func->hasTrivialDestructor(); }
size_t sizeOfData() const override { return nested_func->sizeOfData(); }

View File

@ -69,6 +69,8 @@ public:
nested_func->destroy(place);
}
void destroyUpToState(AggregateDataPtr __restrict) const noexcept override {}
bool hasTrivialDestructor() const override
{
return nested_func->hasTrivialDestructor();

View File

@ -113,6 +113,17 @@ public:
/// Delete data for aggregation.
virtual void destroy(AggregateDataPtr __restrict place) const noexcept = 0;
/// Delete all combinator states that were used after combinator -State.
/// For example for uniqArrayStateForEachMap(...) it will destroy
/// states that were created by combinators Map and ForEach.
/// It's needed because ColumnAggregateFunction in the result will be
/// responsible only for destruction of states that were created
/// by aggregate function and all combinators before -State combinator.
virtual void destroyUpToState(AggregateDataPtr __restrict place) const noexcept
{
destroy(place);
}
/// It is not necessary to delete data.
virtual bool hasTrivialDestructor() const = 0;
@ -277,8 +288,7 @@ public:
Arena * arena) const = 0;
/** Insert result of aggregate function into result column with batch size.
* If destroy_place_after_insert is true. Then implementation of this method
* must destroy aggregate place if insert state into result column was successful.
* The implementation of this method will destroy aggregate place up to -State if insert state into result column was successful.
* All places that were not inserted must be destroyed if there was exception during insert into result column.
*/
virtual void insertResultIntoBatch(
@ -287,8 +297,7 @@ public:
AggregateDataPtr * places,
size_t place_offset,
IColumn & to,
Arena * arena,
bool destroy_place_after_insert) const = 0;
Arena * arena) const = 0;
/** Destroy batch of aggregate places.
*/
@ -612,8 +621,7 @@ public:
AggregateDataPtr * places,
size_t place_offset,
IColumn & to,
Arena * arena,
bool destroy_place_after_insert) const override
Arena * arena) const override
{
size_t batch_index = row_begin;
@ -622,9 +630,9 @@ public:
for (; batch_index < row_end; ++batch_index)
{
static_cast<const Derived *>(this)->insertResultInto(places[batch_index] + place_offset, to, arena);
if (destroy_place_after_insert)
static_cast<const Derived *>(this)->destroy(places[batch_index] + place_offset);
/// For State AggregateFunction ownership of aggregate place is passed to result column after insert,
/// so we need to destroy all states up to state of -State combinator.
static_cast<const Derived *>(this)->destroyUpToState(places[batch_index] + place_offset);
}
}
catch (...)

View File

@ -162,7 +162,7 @@ MutableColumnPtr ColumnAggregateFunction::convertToValues(MutableColumnPtr colum
};
callback(res);
res->forEachSubcolumn(callback);
res->forEachSubcolumnRecursively(callback);
for (auto * val : data)
func->insertResultInto(val, *res, &column_aggregate_func.createOrGetArena());

View File

@ -157,6 +157,14 @@ public:
callback(data);
}
void forEachSubcolumnRecursively(ColumnCallback callback) override
{
callback(offsets);
offsets->forEachSubcolumnRecursively(callback);
callback(data);
data->forEachSubcolumnRecursively(callback);
}
bool structureEquals(const IColumn & rhs) const override
{
if (const auto * rhs_concrete = typeid_cast<const ColumnArray *>(&rhs))

View File

@ -235,6 +235,12 @@ public:
callback(data);
}
void forEachSubcolumnRecursively(ColumnCallback callback) override
{
callback(data);
data->forEachSubcolumnRecursively(callback);
}
bool structureEquals(const IColumn & rhs) const override
{
if (const auto * rhs_concrete = typeid_cast<const ColumnConst *>(&rhs))

View File

@ -173,6 +173,19 @@ public:
callback(dictionary.getColumnUniquePtr());
}
void forEachSubcolumnRecursively(ColumnCallback callback) override
{
callback(idx.getPositionsPtr());
idx.getPositionsPtr()->forEachSubcolumnRecursively(callback);
/// Column doesn't own dictionary if it's shared.
if (!dictionary.isShared())
{
callback(dictionary.getColumnUniquePtr());
dictionary.getColumnUniquePtr()->forEachSubcolumnRecursively(callback);
}
}
bool structureEquals(const IColumn & rhs) const override
{
if (const auto * rhs_low_cardinality = typeid_cast<const ColumnLowCardinality *>(&rhs))

View File

@ -278,6 +278,12 @@ void ColumnMap::forEachSubcolumn(ColumnCallback callback)
callback(nested);
}
void ColumnMap::forEachSubcolumnRecursively(ColumnCallback callback)
{
callback(nested);
nested->forEachSubcolumnRecursively(callback);
}
bool ColumnMap::structureEquals(const IColumn & rhs) const
{
if (const auto * rhs_map = typeid_cast<const ColumnMap *>(&rhs))

View File

@ -89,6 +89,7 @@ public:
size_t allocatedBytes() const override;
void protect() override;
void forEachSubcolumn(ColumnCallback callback) override;
void forEachSubcolumnRecursively(ColumnCallback callback) override;
bool structureEquals(const IColumn & rhs) const override;
double getRatioOfDefaultRows(double sample_ratio) const override;
void getIndicesOfNonDefaultRows(Offsets & indices, size_t from, size_t limit) const override;

View File

@ -136,6 +136,14 @@ public:
callback(null_map);
}
void forEachSubcolumnRecursively(ColumnCallback callback) override
{
callback(nested_column);
nested_column->forEachSubcolumnRecursively(callback);
callback(null_map);
null_map->forEachSubcolumnRecursively(callback);
}
bool structureEquals(const IColumn & rhs) const override
{
if (const auto * rhs_nullable = typeid_cast<const ColumnNullable *>(&rhs))

View File

@ -671,6 +671,18 @@ void ColumnObject::forEachSubcolumn(ColumnCallback callback)
callback(part);
}
void ColumnObject::forEachSubcolumnRecursively(ColumnCallback callback)
{
for (auto & entry : subcolumns)
{
for (auto & part : entry->data.data)
{
callback(part);
part->forEachSubcolumnRecursively(callback);
}
}
}
void ColumnObject::insert(const Field & field)
{
const auto & object = field.get<const Object &>();

View File

@ -211,6 +211,7 @@ public:
size_t byteSize() const override;
size_t allocatedBytes() const override;
void forEachSubcolumn(ColumnCallback callback) override;
void forEachSubcolumnRecursively(ColumnCallback callback) override;
void insert(const Field & field) override;
void insertDefault() override;
void insertFrom(const IColumn & src, size_t n) override;

View File

@ -750,6 +750,14 @@ void ColumnSparse::forEachSubcolumn(ColumnCallback callback)
callback(offsets);
}
void ColumnSparse::forEachSubcolumnRecursively(ColumnCallback callback)
{
callback(values);
values->forEachSubcolumnRecursively(callback);
callback(offsets);
offsets->forEachSubcolumnRecursively(callback);
}
const IColumn::Offsets & ColumnSparse::getOffsetsData() const
{
return assert_cast<const ColumnUInt64 &>(*offsets).getData();

View File

@ -140,6 +140,7 @@ public:
ColumnPtr compress() const override;
void forEachSubcolumn(ColumnCallback callback) override;
void forEachSubcolumnRecursively(ColumnCallback callback) override;
bool structureEquals(const IColumn & rhs) const override;

View File

@ -501,6 +501,15 @@ void ColumnTuple::forEachSubcolumn(ColumnCallback callback)
callback(column);
}
void ColumnTuple::forEachSubcolumnRecursively(ColumnCallback callback)
{
for (auto & column : columns)
{
callback(column);
column->forEachSubcolumnRecursively(callback);
}
}
bool ColumnTuple::structureEquals(const IColumn & rhs) const
{
if (const auto * rhs_tuple = typeid_cast<const ColumnTuple *>(&rhs))

View File

@ -97,6 +97,7 @@ public:
size_t allocatedBytes() const override;
void protect() override;
void forEachSubcolumn(ColumnCallback callback) override;
void forEachSubcolumnRecursively(ColumnCallback callback) override;
bool structureEquals(const IColumn & rhs) const override;
bool isCollationSupported() const override;
ColumnPtr compress() const override;

View File

@ -113,6 +113,15 @@ public:
nested_column_nullable = ColumnNullable::create(column_holder, nested_null_mask);
}
void forEachSubcolumnRecursively(IColumn::ColumnCallback callback) override
{
callback(column_holder);
column_holder->forEachSubcolumnRecursively(callback);
reverse_index.setColumn(getRawColumnPtr());
if (is_nullable)
nested_column_nullable = ColumnNullable::create(column_holder, nested_null_mask);
}
bool structureEquals(const IColumn & rhs) const override
{
if (auto rhs_concrete = typeid_cast<const ColumnUnique *>(&rhs))

View File

@ -414,6 +414,9 @@ public:
using ColumnCallback = std::function<void(WrappedPtr&)>;
virtual void forEachSubcolumn(ColumnCallback) {}
/// Similar to forEachSubcolumn but it also do recursive calls.
virtual void forEachSubcolumnRecursively(ColumnCallback) {}
/// Columns have equal structure.
/// If true - you can use "compareAt", "insertFrom", etc. methods.
[[nodiscard]] virtual bool structureEquals(const IColumn &) const

View File

@ -410,6 +410,17 @@ The server successfully detected this situation and will download merged part fr
M(OverflowBreak, "Number of times, data processing was cancelled by query complexity limitation with setting '*_overflow_mode' = 'break' and the result is incomplete.") \
M(OverflowThrow, "Number of times, data processing was cancelled by query complexity limitation with setting '*_overflow_mode' = 'throw' and exception was thrown.") \
M(OverflowAny, "Number of times approximate GROUP BY was in effect: when aggregation was performed only on top of first 'max_rows_to_group_by' unique keys and other keys were ignored due to 'group_by_overflow_mode' = 'any'.") \
M(DeleteS3Objects, "Number of s3 API DeleteObjects be called") \
M(CopyS3Object, "Number of s3 API CopyObject be called") \
M(ListS3Objects, "Number of s3 API ListObjects be called") \
M(HeadS3Object, "Number of s3 API HeadObject be called") \
M(CreateS3MultipartUpload, "Number of s3 API CreateMultipartUpload be called") \
M(UploadS3PartCopy, "Number of s3 API UploadPartCopy be called") \
M(UploadS3Part, "Number of s3 API UploadS3Part be called") \
M(AbortS3MultipartUpload, "Number of s3 API AbortMultipartUpload be called") \
M(CompleteS3MultipartUpload, "Number of s3 API CompleteS3MultipartUpload be called") \
M(PutS3ObjectRequest, "Number of s3 API PutS3ObjectRequest be called") \
M(GetS3ObjectRequest, "Number of s3 API GetS3ObjectRequest be called")
namespace ProfileEvents
{

View File

@ -185,6 +185,7 @@ void DatabaseOnDisk::createTable(
if (create.attach_short_syntax)
{
/// Metadata already exists, table was detached
assert(fs::exists(getObjectMetadataPath(table_name)));
removeDetachedPermanentlyFlag(local_context, table_name, table_metadata_path, true);
attachTable(local_context, table_name, table, getTableDataPath(create));
return;

View File

@ -1259,4 +1259,24 @@ void DatabaseReplicated::createTableRestoredFromBackup(
}
}
bool DatabaseReplicated::shouldReplicateQuery(const ContextPtr & query_context, const ASTPtr & query_ptr) const
{
if (query_context->getClientInfo().is_replicated_database_internal)
return false;
/// Some ALTERs are not replicated on database level
if (const auto * alter = query_ptr->as<const ASTAlterQuery>())
{
return !alter->isAttachAlter() && !alter->isFetchAlter() && !alter->isDropPartitionAlter();
}
/// DROP DATABASE is not replicated
if (const auto * drop = query_ptr->as<const ASTDropQuery>())
{
return drop->table.get();
}
return true;
}
}

View File

@ -46,7 +46,7 @@ public:
/// Try to execute DLL query on current host as initial query. If query is succeed,
/// then it will be executed on all replicas.
BlockIO tryEnqueueReplicatedDDL(const ASTPtr & query, ContextPtr query_context, bool internal = false);
BlockIO tryEnqueueReplicatedDDL(const ASTPtr & query, ContextPtr query_context, bool internal) override;
bool hasReplicationThread() const override { return true; }
@ -75,6 +75,8 @@ public:
std::vector<std::pair<ASTPtr, StoragePtr>> getTablesForBackup(const FilterByNameFunction & filter, const ContextPtr & local_context) const override;
void createTableRestoredFromBackup(const ASTPtr & create_table_query, ContextMutablePtr local_context, std::shared_ptr<IRestoreCoordination> restore_coordination, UInt64 timeout_ms) override;
bool shouldReplicateQuery(const ContextPtr & query_context, const ASTPtr & query_ptr) const override;
friend struct DatabaseReplicatedTask;
friend class DatabaseReplicatedDDLWorker;
private:

View File

@ -8,6 +8,7 @@
#include <base/types.h>
#include <Common/Exception.h>
#include <Common/ThreadPool.h>
#include <QueryPipeline/BlockIO.h>
#include <ctime>
#include <functional>
@ -338,6 +339,13 @@ public:
throw Exception(ErrorCodes::LOGICAL_ERROR, "Database engine {} does not run a replication thread!", getEngineName());
}
virtual bool shouldReplicateQuery(const ContextPtr & /*query_context*/, const ASTPtr & /*query_ptr*/) const { return false; }
virtual BlockIO tryEnqueueReplicatedDDL(const ASTPtr & /*query*/, ContextPtr /*query_context*/, [[maybe_unused]] bool internal = false) /// NOLINT
{
throw Exception(ErrorCodes::LOGICAL_ERROR, "Database engine {} does not have replicated DDL queue", getEngineName());
}
/// Returns CREATE TABLE queries and corresponding tables prepared for writing to a backup.
virtual std::vector<std::pair<ASTPtr, StoragePtr>> getTablesForBackup(const FilterByNameFunction & filter, const ContextPtr & context) const;

View File

@ -1,4 +1,5 @@
#include <Disks/ObjectStorages/S3/S3ObjectStorage.h>
#include <Common/ProfileEvents.h>
#if USE_AWS_S3
@ -31,6 +32,18 @@
#include <Common/logger_useful.h>
#include <Common/MultiVersion.h>
namespace ProfileEvents
{
extern const Event DeleteS3Objects;
extern const Event HeadS3Object;
extern const Event ListS3Objects;
extern const Event CopyS3Object;
extern const Event CreateS3MultipartUpload;
extern const Event UploadS3PartCopy;
extern const Event AbortS3MultipartUpload;
extern const Event CompleteS3MultipartUpload;
}
namespace DB
{
@ -96,6 +109,8 @@ std::string S3ObjectStorage::generateBlobNameForPath(const std::string & /* path
Aws::S3::Model::HeadObjectOutcome S3ObjectStorage::requestObjectHeadData(const std::string & bucket_from, const std::string & key) const
{
auto client_ptr = client.get();
ProfileEvents::increment(ProfileEvents::HeadS3Object);
Aws::S3::Model::HeadObjectRequest request;
request.SetBucket(bucket_from);
request.SetKey(key);
@ -211,6 +226,7 @@ void S3ObjectStorage::listPrefix(const std::string & path, RelativePathsWithSize
auto settings_ptr = s3_settings.get();
auto client_ptr = client.get();
ProfileEvents::increment(ProfileEvents::ListS3Objects);
Aws::S3::Model::ListObjectsV2Request request;
request.SetBucket(bucket);
request.SetPrefix(path);
@ -239,6 +255,7 @@ void S3ObjectStorage::removeObjectImpl(const StoredObject & object, bool if_exis
{
auto client_ptr = client.get();
ProfileEvents::increment(ProfileEvents::DeleteS3Objects);
Aws::S3::Model::DeleteObjectRequest request;
request.SetBucket(bucket);
request.SetKey(object.absolute_path);
@ -284,6 +301,8 @@ void S3ObjectStorage::removeObjectsImpl(const StoredObjects & objects, bool if_e
Aws::S3::Model::Delete delkeys;
delkeys.SetObjects(current_chunk);
ProfileEvents::increment(ProfileEvents::DeleteS3Objects);
Aws::S3::Model::DeleteObjectsRequest request;
request.SetBucket(bucket);
request.SetDelete(delkeys);
@ -357,6 +376,8 @@ void S3ObjectStorage::copyObjectImpl(
std::optional<ObjectAttributes> metadata) const
{
auto client_ptr = client.get();
ProfileEvents::increment(ProfileEvents::CopyS3Object);
Aws::S3::Model::CopyObjectRequest request;
request.SetCopySource(src_bucket + "/" + src_key);
request.SetBucket(dst_bucket);
@ -405,6 +426,7 @@ void S3ObjectStorage::copyObjectMultipartImpl(
String multipart_upload_id;
{
ProfileEvents::increment(ProfileEvents::CreateS3MultipartUpload);
Aws::S3::Model::CreateMultipartUploadRequest request;
request.SetBucket(dst_bucket);
request.SetKey(dst_key);
@ -423,6 +445,7 @@ void S3ObjectStorage::copyObjectMultipartImpl(
size_t upload_part_size = settings_ptr->s3_settings.min_upload_part_size;
for (size_t position = 0, part_number = 1; position < size; ++part_number, position += upload_part_size)
{
ProfileEvents::increment(ProfileEvents::UploadS3PartCopy);
Aws::S3::Model::UploadPartCopyRequest part_request;
part_request.SetCopySource(src_bucket + "/" + src_key);
part_request.SetBucket(dst_bucket);
@ -434,6 +457,7 @@ void S3ObjectStorage::copyObjectMultipartImpl(
auto outcome = client_ptr->UploadPartCopy(part_request);
if (!outcome.IsSuccess())
{
ProfileEvents::increment(ProfileEvents::AbortS3MultipartUpload);
Aws::S3::Model::AbortMultipartUploadRequest abort_request;
abort_request.SetBucket(dst_bucket);
abort_request.SetKey(dst_key);
@ -448,6 +472,7 @@ void S3ObjectStorage::copyObjectMultipartImpl(
}
{
ProfileEvents::increment(ProfileEvents::CompleteS3MultipartUpload);
Aws::S3::Model::CompleteMultipartUploadRequest req;
req.SetBucket(dst_bucket);
req.SetKey(dst_key);

View File

@ -2,6 +2,7 @@
#include <Columns/ColumnString.h>
#include <Poco/UTF8Encoding.h>
#include <Common/UTF8Helpers.h>
#include <base/defines.h>
#ifdef __SSE2__
#include <emmintrin.h>
@ -89,9 +90,11 @@ struct LowerUpperUTF8Impl
ColumnString::Chars & res_data,
ColumnString::Offsets & res_offsets)
{
if (data.empty())
return;
res_data.resize(data.size());
res_offsets.assign(offsets);
array(data.data(), data.data() + data.size(), res_data.data());
array(data.data(), data.data() + data.size(), offsets, res_data.data());
}
static void vectorFixed(const ColumnString::Chars &, size_t, ColumnString::Chars &)
@ -164,8 +167,11 @@ private:
static constexpr auto ascii_upper_bound = '\x7f';
static constexpr auto flip_case_mask = 'A' ^ 'a';
static void array(const UInt8 * src, const UInt8 * src_end, UInt8 * dst)
static void array(const UInt8 * src, const UInt8 * src_end, const ColumnString::Offsets & offsets, UInt8 * dst)
{
auto offset_it = offsets.begin();
const UInt8 * begin = src;
#ifdef __SSE2__
static constexpr auto bytes_sse = sizeof(__m128i);
const auto * src_end_sse = src + (src_end - src) / bytes_sse * bytes_sse;
@ -213,10 +219,17 @@ private:
else
{
/// UTF-8
const auto * expected_end = src + bytes_sse;
size_t offset_from_begin = src - begin;
while (offset_from_begin >= *offset_it)
++offset_it;
/// Do not allow one row influence another (since row may have invalid sequence, and break the next)
const UInt8 * row_end = begin + *offset_it;
chassert(row_end >= src);
const UInt8 * expected_end = std::min(src + bytes_sse, row_end);
while (src < expected_end)
toCase(src, src_end, dst);
toCase(src, expected_end, dst);
/// adjust src_end_sse by pushing it forward or backward
const auto diff = src - expected_end;
@ -229,10 +242,22 @@ private:
}
}
}
/// Find which offset src has now
while (offset_it != offsets.end() && static_cast<size_t>(src - begin) >= *offset_it)
++offset_it;
#endif
/// handle remaining symbols
/// handle remaining symbols, row by row (to avoid influence of bad UTF8 symbols from one row, to another)
while (src < src_end)
toCase(src, src_end, dst);
{
const UInt8 * row_end = begin + *offset_it;
chassert(row_end >= src);
while (src < row_end)
toCase(src, row_end, dst);
++offset_it;
}
}
};

View File

@ -24,6 +24,7 @@ namespace ProfileEvents
extern const Event ReadBufferFromS3Bytes;
extern const Event ReadBufferFromS3RequestsErrors;
extern const Event ReadBufferSeekCancelConnection;
extern const Event GetS3ObjectRequest;
}
namespace DB
@ -275,6 +276,7 @@ SeekableReadBuffer::Range ReadBufferFromS3::getRemainingReadRange() const
std::unique_ptr<ReadBuffer> ReadBufferFromS3::initialize()
{
ProfileEvents::increment(ProfileEvents::GetS3ObjectRequest);
Aws::S3::Model::GetObjectRequest req;
req.SetBucket(bucket);
req.SetKey(key);

View File

@ -1,4 +1,5 @@
#include <Common/config.h>
#include <Common/ProfileEvents.h>
#if USE_AWS_S3
@ -24,6 +25,10 @@
namespace ProfileEvents
{
extern const Event WriteBufferFromS3Bytes;
extern const Event S3WriteBytes;
extern const Event CompleteS3MultipartUpload;
extern const Event UploadS3Part;
extern const Event PutS3ObjectRequest;
}
namespace DB
@ -303,6 +308,7 @@ void WriteBufferFromS3::fillUploadRequest(Aws::S3::Model::UploadPartRequest & re
void WriteBufferFromS3::processUploadRequest(UploadPartTask & task)
{
ProfileEvents::increment(ProfileEvents::UploadS3Part);
auto outcome = client_ptr->UploadPart(task.req);
if (outcome.IsSuccess())
@ -326,6 +332,7 @@ void WriteBufferFromS3::completeMultipartUpload()
if (tags.empty())
throw Exception("Failed to complete multipart upload. No parts have uploaded", ErrorCodes::S3_ERROR);
ProfileEvents::increment(ProfileEvents::CompleteS3MultipartUpload);
Aws::S3::Model::CompleteMultipartUploadRequest req;
req.SetBucket(bucket);
req.SetKey(key);
@ -429,6 +436,7 @@ void WriteBufferFromS3::fillPutRequest(Aws::S3::Model::PutObjectRequest & req)
void WriteBufferFromS3::processPutRequest(const PutObjectTask & task)
{
ProfileEvents::increment(ProfileEvents::PutS3ObjectRequest);
auto outcome = client_ptr->PutObject(task.req);
bool with_pool = static_cast<bool>(schedule);
if (outcome.IsSuccess())

View File

@ -50,19 +50,15 @@ OutputBlockColumns prepareOutputBlockColumns(
if (aggregate_functions[i]->isState())
{
/// The ColumnAggregateFunction column captures the shared ownership of the arena with aggregate function states.
if (auto * column_aggregate_func = typeid_cast<ColumnAggregateFunction *>(final_aggregate_columns[i].get()))
for (auto & pool : aggregates_pools)
column_aggregate_func->addArena(pool);
/// Aggregate state can be wrapped into array if aggregate function ends with -Resample combinator.
final_aggregate_columns[i]->forEachSubcolumn(
[&aggregates_pools](auto & subcolumn)
{
if (auto * column_aggregate_func = typeid_cast<ColumnAggregateFunction *>(subcolumn.get()))
for (auto & pool : aggregates_pools)
column_aggregate_func->addArena(pool);
});
auto callback = [&](auto & subcolumn)
{
/// The ColumnAggregateFunction column captures the shared ownership of the arena with aggregate function states.
if (auto * column_aggregate_func = typeid_cast<ColumnAggregateFunction *>(subcolumn.get()))
for (auto & pool : aggregates_pools)
column_aggregate_func->addArena(pool);
};
callback(final_aggregate_columns[i]);
final_aggregate_columns[i]->forEachSubcolumnRecursively(callback);
}
}
}

View File

@ -1754,8 +1754,11 @@ inline void Aggregator::insertAggregatesIntoColumns(Mapped & mapped, MutableColu
* It is also tricky, because there are aggregate functions with "-State" modifier.
* When we call "insertResultInto" for them, they insert a pointer to the state to ColumnAggregateFunction
* and ColumnAggregateFunction will take ownership of this state.
* So, for aggregate functions with "-State" modifier, the state must not be destroyed
* after it has been transferred to ColumnAggregateFunction.
* So, for aggregate functions with "-State" modifier, only states of all combinators that are used
* after -State will be destroyed after result has been transferred to ColumnAggregateFunction.
* For example, if we have function `uniqStateForEachMap` after aggregation we should destroy all states that
* were created by combinators `-ForEach` and `-Map`, because resulting ColumnAggregateFunction will be
* responsible only for destruction of the states created by `uniq` function.
* But we should mark that the data no longer owns these states.
*/
@ -1778,8 +1781,8 @@ inline void Aggregator::insertAggregatesIntoColumns(Mapped & mapped, MutableColu
/** Destroy states that are no longer needed. This loop does not throw.
*
* Don't destroy states for "-State" aggregate functions,
* because the ownership of this state is transferred to ColumnAggregateFunction
* For functions with -State combinator we destroy only states of all combinators that are used
* after -State, because the ownership of the rest states is transferred to ColumnAggregateFunction
* and ColumnAggregateFunction will take care.
*
* But it's only for states that has been transferred to ColumnAggregateFunction
@ -1787,10 +1790,10 @@ inline void Aggregator::insertAggregatesIntoColumns(Mapped & mapped, MutableColu
*/
for (size_t destroy_i = 0; destroy_i < params.aggregates_size; ++destroy_i)
{
/// If ownership was not transferred to ColumnAggregateFunction.
if (!(destroy_i < insert_i && aggregate_functions[destroy_i]->isState()))
aggregate_functions[destroy_i]->destroy(
mapped + offsets_of_aggregate_states[destroy_i]);
if (destroy_i < insert_i)
aggregate_functions[destroy_i]->destroyUpToState(mapped + offsets_of_aggregate_states[destroy_i]);
else
aggregate_functions[destroy_i]->destroy(mapped + offsets_of_aggregate_states[destroy_i]);
}
/// Mark the cell as destroyed so it will not be destroyed in destructor.
@ -1855,12 +1858,7 @@ Block Aggregator::insertResultsIntoColumns(PaddedPODArray<AggregateDataPtr> & pl
size_t destroy_index = aggregate_functions_destroy_index;
++aggregate_functions_destroy_index;
/// For State AggregateFunction ownership of aggregate place is passed to result column after insert
bool is_state = aggregate_functions[destroy_index]->isState();
bool destroy_place_after_insert = !is_state;
aggregate_functions[destroy_index]->insertResultIntoBatch(
0, places.size(), places.data(), offset, *final_aggregate_column, arena, destroy_place_after_insert);
aggregate_functions[destroy_index]->insertResultIntoBatch(0, places.size(), places.data(), offset, *final_aggregate_column, arena);
}
}
catch (...)

View File

@ -78,15 +78,11 @@ BlockIO InterpreterAlterQuery::executeToTable(const ASTAlterQuery & alter)
query_ptr->as<ASTAlterQuery &>().setDatabase(table_id.database_name);
DatabasePtr database = DatabaseCatalog::instance().getDatabase(table_id.database_name);
if (typeid_cast<DatabaseReplicated *>(database.get())
&& !getContext()->getClientInfo().is_replicated_database_internal
&& !alter.isAttachAlter()
&& !alter.isFetchAlter()
&& !alter.isDropPartitionAlter())
if (database->shouldReplicateQuery(getContext(), query_ptr))
{
auto guard = DatabaseCatalog::instance().getDDLGuard(table_id.database_name, table_id.table_name);
guard->releaseTableLock();
return typeid_cast<DatabaseReplicated *>(database.get())->tryEnqueueReplicatedDDL(query_ptr, getContext());
return database->tryEnqueueReplicatedDDL(query_ptr, getContext());
}
StoragePtr table = DatabaseCatalog::instance().getTable(table_id, getContext());

View File

@ -38,11 +38,11 @@ BlockIO InterpreterCreateIndexQuery::execute()
query_ptr->as<ASTCreateIndexQuery &>().setDatabase(table_id.database_name);
DatabasePtr database = DatabaseCatalog::instance().getDatabase(table_id.database_name);
if (typeid_cast<DatabaseReplicated *>(database.get()) && !current_context->getClientInfo().is_replicated_database_internal)
if (database->shouldReplicateQuery(getContext(), query_ptr))
{
auto guard = DatabaseCatalog::instance().getDDLGuard(table_id.database_name, table_id.table_name);
guard->releaseTableLock();
return assert_cast<DatabaseReplicated *>(database.get())->tryEnqueueReplicatedDDL(query_ptr, current_context);
return database->tryEnqueueReplicatedDDL(query_ptr, current_context);
}
StoragePtr table = DatabaseCatalog::instance().getTable(table_id, current_context);

View File

@ -1001,27 +1001,27 @@ BlockIO InterpreterCreateQuery::createTable(ASTCreateQuery & create)
String current_database = getContext()->getCurrentDatabase();
auto database_name = create.database ? create.getDatabase() : current_database;
DDLGuardPtr ddl_guard;
// If this is a stub ATTACH query, read the query definition from the database
if (create.attach && !create.storage && !create.columns_list)
{
auto database = DatabaseCatalog::instance().getDatabase(database_name);
if (database->getEngineName() == "Replicated")
if (database->shouldReplicateQuery(getContext(), query_ptr))
{
auto guard = DatabaseCatalog::instance().getDDLGuard(database_name, create.getTable());
if (auto * ptr = typeid_cast<DatabaseReplicated *>(database.get());
ptr && !getContext()->getClientInfo().is_replicated_database_internal)
{
create.setDatabase(database_name);
guard->releaseTableLock();
return ptr->tryEnqueueReplicatedDDL(query_ptr, getContext(), internal);
}
create.setDatabase(database_name);
guard->releaseTableLock();
return database->tryEnqueueReplicatedDDL(query_ptr, getContext(), internal);
}
if (!create.cluster.empty())
return executeQueryOnCluster(create);
/// For short syntax of ATTACH query we have to lock table name here, before reading metadata
/// and hold it until table is attached
ddl_guard = DatabaseCatalog::instance().getDDLGuard(database_name, create.getTable());
bool if_not_exists = create.if_not_exists;
// Table SQL definition is available even if the table is detached (even permanently)
@ -1053,6 +1053,7 @@ BlockIO InterpreterCreateQuery::createTable(ASTCreateQuery & create)
if (create.attach_from_path)
{
chassert(!ddl_guard);
fs::path user_files = fs::path(getContext()->getUserFilesPath()).lexically_normal();
fs::path root_path = fs::path(getContext()->getPath()).lexically_normal();
@ -1145,27 +1146,30 @@ BlockIO InterpreterCreateQuery::createTable(ASTCreateQuery & create)
if (need_add_to_database)
database = DatabaseCatalog::instance().getDatabase(database_name);
if (need_add_to_database && database->getEngineName() == "Replicated")
if (need_add_to_database && database->shouldReplicateQuery(getContext(), query_ptr))
{
chassert(!ddl_guard);
auto guard = DatabaseCatalog::instance().getDDLGuard(create.getDatabase(), create.getTable());
if (auto * ptr = typeid_cast<DatabaseReplicated *>(database.get());
ptr && !getContext()->getClientInfo().is_replicated_database_internal)
{
assertOrSetUUID(create, database);
guard->releaseTableLock();
return ptr->tryEnqueueReplicatedDDL(query_ptr, getContext(), internal);
}
assertOrSetUUID(create, database);
guard->releaseTableLock();
return database->tryEnqueueReplicatedDDL(query_ptr, getContext(), internal);
}
if (!create.cluster.empty())
{
chassert(!ddl_guard);
return executeQueryOnCluster(create);
}
if (create.replace_table)
{
chassert(!ddl_guard);
return doCreateOrReplaceTable(create, properties);
}
/// Actually creates table
bool created = doCreateTable(create, properties);
bool created = doCreateTable(create, properties, ddl_guard);
ddl_guard.reset();
if (!created) /// Table already exists
return {};
@ -1180,7 +1184,8 @@ BlockIO InterpreterCreateQuery::createTable(ASTCreateQuery & create)
}
bool InterpreterCreateQuery::doCreateTable(ASTCreateQuery & create,
const InterpreterCreateQuery::TableProperties & properties)
const InterpreterCreateQuery::TableProperties & properties,
DDLGuardPtr & ddl_guard)
{
if (create.temporary)
{
@ -1193,16 +1198,12 @@ bool InterpreterCreateQuery::doCreateTable(ASTCreateQuery & create,
return true;
}
std::unique_ptr<DDLGuard> guard;
if (!ddl_guard)
ddl_guard = DatabaseCatalog::instance().getDDLGuard(create.getDatabase(), create.getTable());
String data_path;
DatabasePtr database;
/** If the request specifies IF NOT EXISTS, we allow concurrent CREATE queries (which do nothing).
* If table doesn't exist, one thread is creating table, while others wait in DDLGuard.
*/
guard = DatabaseCatalog::instance().getDDLGuard(create.getDatabase(), create.getTable());
database = DatabaseCatalog::instance().getDatabase(create.getDatabase());
assertOrSetUUID(create, database);
@ -1411,7 +1412,9 @@ BlockIO InterpreterCreateQuery::doCreateOrReplaceTable(ASTCreateQuery & create,
try
{
/// Create temporary table (random name will be generated)
[[maybe_unused]] bool done = InterpreterCreateQuery(query_ptr, create_context).doCreateTable(create, properties);
DDLGuardPtr ddl_guard;
[[maybe_unused]] bool done = InterpreterCreateQuery(query_ptr, create_context).doCreateTable(create, properties, ddl_guard);
ddl_guard.reset();
assert(done);
created = true;

View File

@ -18,7 +18,9 @@ class ASTExpressionList;
class ASTConstraintDeclaration;
class ASTStorage;
class IDatabase;
class DDLGuard;
using DatabasePtr = std::shared_ptr<IDatabase>;
using DDLGuardPtr = std::unique_ptr<DDLGuard>;
/** Allows to create new table or database,
@ -89,7 +91,7 @@ private:
AccessRightsElements getRequiredAccess() const;
/// Create IStorage and add it to database. If table already exists and IF NOT EXISTS specified, do nothing and return false.
bool doCreateTable(ASTCreateQuery & create, const TableProperties & properties);
bool doCreateTable(ASTCreateQuery & create, const TableProperties & properties, DDLGuardPtr & ddl_guard);
BlockIO doCreateOrReplaceTable(ASTCreateQuery & create, const InterpreterCreateQuery::TableProperties & properties);
/// Inserts data in created table if it's CREATE ... SELECT
BlockIO fillTableIfNeeded(const ASTCreateQuery & create);

View File

@ -48,12 +48,11 @@ BlockIO InterpreterDeleteQuery::execute()
throw Exception(ErrorCodes::TABLE_IS_READ_ONLY, "Table is read-only");
DatabasePtr database = DatabaseCatalog::instance().getDatabase(table_id.database_name);
if (typeid_cast<DatabaseReplicated *>(database.get())
&& !getContext()->getClientInfo().is_replicated_database_internal)
if (database->shouldReplicateQuery(getContext(), query_ptr))
{
auto guard = DatabaseCatalog::instance().getDDLGuard(table_id.database_name, table_id.table_name);
guard->releaseTableLock();
return typeid_cast<DatabaseReplicated *>(database.get())->tryEnqueueReplicatedDDL(query_ptr, getContext());
return database->tryEnqueueReplicatedDDL(query_ptr, getContext());
}
auto table_lock = table->lockForShare(getContext()->getCurrentQueryId(), getContext()->getSettingsRef().lock_acquire_timeout);

View File

@ -36,11 +36,11 @@ BlockIO InterpreterDropIndexQuery::execute()
query_ptr->as<ASTDropIndexQuery &>().setDatabase(table_id.database_name);
DatabasePtr database = DatabaseCatalog::instance().getDatabase(table_id.database_name);
if (typeid_cast<DatabaseReplicated *>(database.get()) && !current_context->getClientInfo().is_replicated_database_internal)
if (database->shouldReplicateQuery(getContext(), query_ptr))
{
auto guard = DatabaseCatalog::instance().getDDLGuard(table_id.database_name, table_id.table_name);
guard->releaseTableLock();
return assert_cast<DatabaseReplicated *>(database.get())->tryEnqueueReplicatedDDL(query_ptr, current_context);
return database->tryEnqueueReplicatedDDL(query_ptr, current_context);
}
StoragePtr table = DatabaseCatalog::instance().getTable(table_id, current_context);

View File

@ -139,9 +139,6 @@ BlockIO InterpreterDropQuery::executeToTableImpl(ContextPtr context_, ASTDropQue
/// Prevents recursive drop from drop database query. The original query must specify a table.
bool is_drop_or_detach_database = !query_ptr->as<ASTDropQuery>()->table;
bool is_replicated_ddl_query = typeid_cast<DatabaseReplicated *>(database.get()) &&
!context_->getClientInfo().is_replicated_database_internal &&
!is_drop_or_detach_database;
AccessFlags drop_storage;
@ -152,7 +149,7 @@ BlockIO InterpreterDropQuery::executeToTableImpl(ContextPtr context_, ASTDropQue
else
drop_storage = AccessType::DROP_TABLE;
if (is_replicated_ddl_query)
if (database->shouldReplicateQuery(getContext(), query_ptr))
{
if (query.kind == ASTDropQuery::Kind::Detach)
context_->checkAccess(drop_storage, table_id);
@ -163,7 +160,7 @@ BlockIO InterpreterDropQuery::executeToTableImpl(ContextPtr context_, ASTDropQue
ddl_guard->releaseTableLock();
table.reset();
return typeid_cast<DatabaseReplicated *>(database.get())->tryEnqueueReplicatedDDL(query.clone(), context_);
return database->tryEnqueueReplicatedDDL(query.clone(), context_);
}
if (query.kind == ASTDropQuery::Kind::Detach)

View File

@ -107,7 +107,7 @@ BlockIO InterpreterRenameQuery::executeToTables(const ASTRenameQuery & rename, c
}
DatabasePtr database = database_catalog.getDatabase(elem.from_database_name);
if (typeid_cast<DatabaseReplicated *>(database.get()) && !getContext()->getClientInfo().is_replicated_database_internal)
if (database->shouldReplicateQuery(getContext(), query_ptr))
{
if (1 < descriptions.size())
throw Exception(
@ -120,7 +120,7 @@ BlockIO InterpreterRenameQuery::executeToTables(const ASTRenameQuery & rename, c
UniqueTableName to(elem.to_database_name, elem.to_table_name);
ddl_guards[from]->releaseTableLock();
ddl_guards[to]->releaseTableLock();
return typeid_cast<DatabaseReplicated *>(database.get())->tryEnqueueReplicatedDDL(query_ptr, getContext());
return database->tryEnqueueReplicatedDDL(query_ptr, getContext());
}
else
{

View File

@ -562,12 +562,16 @@ bool maybeRemoveOnCluster(const ASTPtr & query_ptr, ContextPtr context)
if (database_name != query_on_cluster->cluster)
return false;
auto db = DatabaseCatalog::instance().tryGetDatabase(database_name);
if (!db || db->getEngineName() != "Replicated")
return false;
auto database = DatabaseCatalog::instance().tryGetDatabase(database_name);
if (database && database->shouldReplicateQuery(context, query_ptr))
{
/// It's Replicated database and query is replicated on database level,
/// so ON CLUSTER clause is redundant.
query_on_cluster->cluster.clear();
return true;
}
query_on_cluster->cluster.clear();
return true;
return false;
}
}

View File

@ -1,4 +1,5 @@
#include <Common/config.h>
#include <Common/ProfileEvents.h>
#include "IO/ParallelReadBuffer.h"
#include "IO/IOThreadPool.h"
#include "Parsers/ASTCreateQuery.h"
@ -63,6 +64,12 @@ namespace fs = std::filesystem;
static const String PARTITION_ID_WILDCARD = "{_partition_id}";
namespace ProfileEvents
{
extern const Event DeleteS3Objects;
extern const Event ListS3Objects;
}
namespace DB
{
@ -164,6 +171,7 @@ private:
{
buffer.clear();
ProfileEvents::increment(ProfileEvents::ListS3Objects);
outcome = client.ListObjectsV2(request);
if (!outcome.IsSuccess())
throw Exception(ErrorCodes::S3_ERROR, "Could not list objects in bucket {} with prefix {}, S3 exception: {}, message: {}",
@ -559,6 +567,7 @@ static bool checkIfObjectExists(const std::shared_ptr<const Aws::S3::S3Client> &
request.SetPrefix(key);
while (!is_finished)
{
ProfileEvents::increment(ProfileEvents::ListS3Objects);
outcome = client->ListObjectsV2(request);
if (!outcome.IsSuccess())
throw Exception(
@ -1036,6 +1045,7 @@ void StorageS3::truncate(const ASTPtr & /* query */, const StorageMetadataPtr &,
delkeys.AddObjects(std::move(obj));
}
ProfileEvents::increment(ProfileEvents::DeleteS3Objects);
Aws::S3::Model::DeleteObjectsRequest request;
request.SetBucket(s3_configuration.uri.bucket);
request.SetDelete(delkeys);

View File

@ -56,6 +56,17 @@ init_list = {
"DiskS3WriteRequestsErrorsTotal": 0,
"DiskS3WriteRequestsErrors503": 0,
"DiskS3WriteRequestsRedirects": 0,
"DeleteS3Objects": 0,
"CopyS3Object": 0,
"ListS3Objects": 0,
"HeadS3Object": 0,
"CreateS3MultipartUpload": 0,
"UploadS3PartCopy": 0,
"UploadS3Part": 0,
"AbortS3MultipartUpload": 0,
"CompleteS3MultipartUpload": 0,
"PutS3ObjectRequest": 0,
"GetS3ObjectRequest": 0,
}

View File

@ -354,6 +354,42 @@ def test_alter_drop_detached_part(started_cluster, engine):
dummy_node.query("DROP DATABASE testdb SYNC")
@pytest.mark.parametrize("engine", ["MergeTree", "ReplicatedMergeTree"])
def test_alter_drop_partition(started_cluster, engine):
main_node.query(
"CREATE DATABASE alter_drop_partition ENGINE = Replicated('/clickhouse/databases/test_alter_drop_partition', 'shard1', 'replica1');"
)
dummy_node.query(
"CREATE DATABASE alter_drop_partition ENGINE = Replicated('/clickhouse/databases/test_alter_drop_partition', 'shard1', 'replica2');"
)
snapshotting_node.query(
"CREATE DATABASE alter_drop_partition ENGINE = Replicated('/clickhouse/databases/test_alter_drop_partition', 'shard2', 'replica1');"
)
table = f"alter_drop_partition.alter_drop_{engine}"
main_node.query(
f"CREATE TABLE {table} (CounterID UInt32) ENGINE = {engine} ORDER BY (CounterID)"
)
main_node.query(f"INSERT INTO {table} VALUES (123)")
if engine == "MergeTree":
dummy_node.query(f"INSERT INTO {table} VALUES (456)")
snapshotting_node.query(f"INSERT INTO {table} VALUES (789)")
main_node.query(
f"ALTER TABLE {table} ON CLUSTER alter_drop_partition DROP PARTITION ID 'all'",
settings={"replication_alter_partitions_sync": 2},
)
assert (
main_node.query(
f"SELECT CounterID FROM clusterAllReplicas('alter_drop_partition', {table})"
)
== ""
)
assert dummy_node.query(f"SELECT CounterID FROM {table}") == ""
main_node.query("DROP DATABASE alter_drop_partition")
dummy_node.query("DROP DATABASE alter_drop_partition")
snapshotting_node.query("DROP DATABASE alter_drop_partition")
def test_alter_fetch(started_cluster):
main_node.query(
"CREATE DATABASE testdb ENGINE = Replicated('/clickhouse/databases/test1', 'shard1', 'replica1');"

View File

@ -0,0 +1,4 @@
<test>
<query>SELECT lowerUTF8(SearchPhrase) FROM hits_100m_single FORMAT Null</query>
<query>SELECT upperUTF8(SearchPhrase) FROM hits_100m_single FORMAT Null</query>
</test>

View File

@ -0,0 +1,11 @@
-- { echoOn }
-- NOTE: total string size should be > 16 (sizeof(__m128i))
insert into utf8_overlap values ('\xe2'), ('Foo⚊BarBazBam'), ('\xe2'), ('Foo⚊BarBazBam');
-- ^
-- MONOGRAM FOR YANG
with lowerUTF8(str) as l_, upperUTF8(str) as u_, '0x' || hex(str) as h_
select length(str), if(l_ == '\xe2', h_, l_), if(u_ == '\xe2', h_, u_) from utf8_overlap format CSV;
1,"0xE2","0xE2"
15,"foo⚊barbazbam","FOO⚊BARBAZBAM"
1,"0xE2","0xE2"
15,"foo⚊barbazbam","FOO⚊BARBAZBAM"

View File

@ -0,0 +1,10 @@
drop table if exists utf8_overlap;
create table utf8_overlap (str String) engine=Memory();
-- { echoOn }
-- NOTE: total string size should be > 16 (sizeof(__m128i))
insert into utf8_overlap values ('\xe2'), ('Foo⚊BarBazBam'), ('\xe2'), ('Foo⚊BarBazBam');
-- ^
-- MONOGRAM FOR YANG
with lowerUTF8(str) as l_, upperUTF8(str) as u_, '0x' || hex(str) as h_
select length(str), if(l_ == '\xe2', h_, l_), if(u_ == '\xe2', h_, u_) from utf8_overlap format CSV;

View File

@ -0,0 +1,10 @@
{1:'\0\n\0\0\0\0uð¨,ËÂ4‡Æ£“Õñô÷çƒeÎØø\tÝÑhäuULÕsE|Ç'}
{1:[{1:['\0\n\0\0\0\0uð¨,ËÂ4‡Æ£“Õñô÷çƒeÎØø\tÝÑhäuULÕsE|Ç','\0\n\0\0\0\0uð¨,ËÂ4‡Æ£“Õñô÷çƒeÎØø\tÝÑhäuULÕsE|Ç']}]}
[['\0\n\0\0\0\0uð¨,ËÂ4‡Æ£“Õñô÷çƒeÎØø\tÝÑhäuULÕsE|Ç','\0\nuð¨,ËÂ4‡Æ£“Õñô÷çƒeÎØø\tÝÑhä\rÓH%uULÕsE|Ç'],[]]
[[{1:'\0\n\0\0\0\0uð¨,ËÂ4‡Æ£“Õñô÷çƒeÎØø\tÝÑhäuULÕsE|Ç'}],[]]
['\0\n\0\0\0\0uð¨,ËÂ4‡Æ£“Õñô÷çƒeÎØø\tÝÑhäuULÕsE|Ç']
10
{1:10}
{1:[{1:[10,10]}]}
[[10,10],[]]
[[{1:10}],[]]

View File

@ -0,0 +1,36 @@
select uniqStateMap(map(1, number)) from numbers(10);
select uniqStateForEachMapForEachMap(map(1, [map(1, [number, number])])) from numbers(10);
select uniqStateForEachResample(30, 75, 30)([number, number + 1], 30) from numbers(10);
select uniqStateMapForEachResample(30, 75, 30)([map(1, number)], 30) from numbers(10);
select uniqStateForEachMerge(x) as y from (select uniqStateForEachState([number]) as x from numbers(10));
select uniqMerge(y[1]) from (select uniqStateForEachMerge(x) as y from (select uniqStateForEachState([number]) as x from numbers(10)));
drop table if exists test;
create table test (x Map(UInt8, AggregateFunction(uniq, UInt64))) engine=Memory;
insert into test select uniqStateMap(map(1, number)) from numbers(10);
select * from test format Null;
select mapApply(k, v -> (k, finalizeAggregation(v)), x) from test;
truncate table test;
drop table test;
create table test (x Map(UInt8, Array(Map(UInt8, Array(AggregateFunction(uniq, UInt64)))))) engine=Memory;
insert into test select uniqStateForEachMapForEachMap(map(1, [map(1, [number, number])])) from numbers(10);
select mapApply(k, v -> (k, arrayMap(x -> mapApply(k, v -> (k, arrayMap(x -> finalizeAggregation(x), v)), x), v)), x) from test;
select * from test format Null;
truncate table test;
drop table test;
create table test (x Array(Array(AggregateFunction(uniq, UInt64)))) engine=Memory;
insert into test select uniqStateForEachResample(30, 75, 30)([number, number + 1], 30) from numbers(10);
select arrayMap(x -> arrayMap(x -> finalizeAggregation(x), x), x) from test;
select * from test format Null;
truncate table test;
drop table test;
create table test (x Array(Array(Map(UInt8, AggregateFunction(uniq, UInt64))))) engine=Memory;
insert into test select uniqStateMapForEachResample(30, 75, 30)([map(1, number)], 30) from numbers(10);
select arrayMap(x -> arrayMap(x -> mapApply(k, v -> (k, finalizeAggregation(v)), x), x), x) from test;
select * from test format Null;
truncate table test;
drop table test;