mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-10 01:25:21 +00:00
Merge branch 'master' of github.com:ClickHouse/ClickHouse into BLAKE3
This commit is contained in:
commit
55fcb98f29
@ -5,8 +5,9 @@
|
||||
#include <type_traits>
|
||||
|
||||
|
||||
/** \brief Returns value `from` converted to type `To` while retaining bit representation.
|
||||
* `To` and `From` must satisfy `CopyConstructible`.
|
||||
/** Returns value `from` converted to type `To` while retaining bit representation.
|
||||
* `To` and `From` must satisfy `CopyConstructible`.
|
||||
* In contrast to std::bit_cast can cast types of different width.
|
||||
*/
|
||||
template <typename To, typename From>
|
||||
std::decay_t<To> bit_cast(const From & from)
|
||||
@ -15,13 +16,3 @@ std::decay_t<To> bit_cast(const From & from)
|
||||
memcpy(static_cast<void*>(&res), &from, std::min(sizeof(res), sizeof(from)));
|
||||
return res;
|
||||
}
|
||||
|
||||
/** \brief Returns value `from` converted to type `To` while retaining bit representation.
|
||||
* `To` and `From` must satisfy `CopyConstructible`.
|
||||
*/
|
||||
template <typename To, typename From>
|
||||
std::decay_t<To> safe_bit_cast(const From & from)
|
||||
{
|
||||
static_assert(sizeof(To) == sizeof(From), "bit cast on types of different width");
|
||||
return bit_cast<To, From>(from);
|
||||
}
|
||||
|
2
contrib/krb5
vendored
2
contrib/krb5
vendored
@ -1 +1 @@
|
||||
Subproject commit d879821c7a4c70b0c3ad739d9951d1a2b1903df7
|
||||
Subproject commit b89e20367b074bd02dd118a6534099b21e88b3c3
|
@ -3433,7 +3433,7 @@ Possible values:
|
||||
- 0 — Disabled.
|
||||
- 1 — Enabled.
|
||||
|
||||
Default value: 0.
|
||||
Default value: 1.
|
||||
|
||||
## input_format_with_names_use_header {#input_format_with_names_use_header}
|
||||
|
||||
|
@ -155,7 +155,7 @@ Example of configuration for versions earlier than 22.8:
|
||||
<endpoint>...</endpoint>
|
||||
... s3 configuration ...
|
||||
<data_cache_enabled>1</data_cache_enabled>
|
||||
<data_cache_size>10000000</data_cache_size>
|
||||
<data_cache_max_size>10000000</data_cache_max_size>
|
||||
</s3>
|
||||
</disks>
|
||||
<policies>
|
||||
|
@ -7,13 +7,8 @@ title: "Geo Data Types"
|
||||
|
||||
ClickHouse supports data types for representing geographical objects — locations, lands, etc.
|
||||
|
||||
:::warning
|
||||
Currently geo data types are an experimental feature. To work with them you must set `allow_experimental_geo_types = 1`.
|
||||
:::
|
||||
|
||||
**See Also**
|
||||
- [Representing simple geographical features](https://en.wikipedia.org/wiki/GeoJSON).
|
||||
- [allow_experimental_geo_types](../../operations/settings/settings.md#allow-experimental-geo-types) setting.
|
||||
|
||||
## Point
|
||||
|
||||
@ -24,7 +19,6 @@ Currently geo data types are an experimental feature. To work with them you must
|
||||
Query:
|
||||
|
||||
```sql
|
||||
SET allow_experimental_geo_types = 1;
|
||||
CREATE TABLE geo_point (p Point) ENGINE = Memory();
|
||||
INSERT INTO geo_point VALUES((10, 10));
|
||||
SELECT p, toTypeName(p) FROM geo_point;
|
||||
@ -46,7 +40,6 @@ Result:
|
||||
Query:
|
||||
|
||||
```sql
|
||||
SET allow_experimental_geo_types = 1;
|
||||
CREATE TABLE geo_ring (r Ring) ENGINE = Memory();
|
||||
INSERT INTO geo_ring VALUES([(0, 0), (10, 0), (10, 10), (0, 10)]);
|
||||
SELECT r, toTypeName(r) FROM geo_ring;
|
||||
@ -68,7 +61,6 @@ Result:
|
||||
This is a polygon with one hole:
|
||||
|
||||
```sql
|
||||
SET allow_experimental_geo_types = 1;
|
||||
CREATE TABLE geo_polygon (pg Polygon) ENGINE = Memory();
|
||||
INSERT INTO geo_polygon VALUES([[(20, 20), (50, 20), (50, 50), (20, 50)], [(30, 30), (50, 50), (50, 30)]]);
|
||||
SELECT pg, toTypeName(pg) FROM geo_polygon;
|
||||
@ -91,7 +83,6 @@ Result:
|
||||
This multipolygon consists of two separate polygons — the first one without holes, and the second with one hole:
|
||||
|
||||
```sql
|
||||
SET allow_experimental_geo_types = 1;
|
||||
CREATE TABLE geo_multipolygon (mpg MultiPolygon) ENGINE = Memory();
|
||||
INSERT INTO geo_multipolygon VALUES([[[(0, 0), (10, 0), (10, 10), (0, 10)]], [[(20, 20), (50, 20), (50, 50), (20, 50)],[(30, 30), (50, 50), (50, 30)]]]);
|
||||
SELECT mpg, toTypeName(mpg) FROM geo_multipolygon;
|
||||
|
@ -64,6 +64,11 @@ public:
|
||||
return nested_func->isVersioned();
|
||||
}
|
||||
|
||||
size_t getVersionFromRevision(size_t revision) const override
|
||||
{
|
||||
return nested_func->getVersionFromRevision(revision);
|
||||
}
|
||||
|
||||
size_t getDefaultVersion() const override
|
||||
{
|
||||
return nested_func->getDefaultVersion();
|
||||
@ -79,6 +84,11 @@ public:
|
||||
nested_func->destroy(place);
|
||||
}
|
||||
|
||||
void destroyUpToState(AggregateDataPtr __restrict place) const noexcept override
|
||||
{
|
||||
nested_func->destroyUpToState(place);
|
||||
}
|
||||
|
||||
bool hasTrivialDestructor() const override
|
||||
{
|
||||
return nested_func->hasTrivialDestructor();
|
||||
|
@ -35,12 +35,12 @@ private:
|
||||
using Counter = UInt64;
|
||||
size_t category_count;
|
||||
|
||||
Counter & counter(AggregateDataPtr __restrict place, size_t i, bool what) const
|
||||
static Counter & counter(AggregateDataPtr __restrict place, size_t i, bool what)
|
||||
{
|
||||
return reinterpret_cast<Counter *>(place)[i * 2 + (what ? 1 : 0)];
|
||||
}
|
||||
|
||||
const Counter & counter(ConstAggregateDataPtr __restrict place, size_t i, bool what) const
|
||||
static const Counter & counter(ConstAggregateDataPtr __restrict place, size_t i, bool what)
|
||||
{
|
||||
return reinterpret_cast<const Counter *>(place)[i * 2 + (what ? 1 : 0)];
|
||||
}
|
||||
|
@ -225,6 +225,12 @@ public:
|
||||
nested_func->destroy(getNestedPlace(place));
|
||||
}
|
||||
|
||||
void destroyUpToState(AggregateDataPtr __restrict place) const noexcept override
|
||||
{
|
||||
this->data(place).~Data();
|
||||
nested_func->destroyUpToState(getNestedPlace(place));
|
||||
}
|
||||
|
||||
String getName() const override
|
||||
{
|
||||
return nested_func->getName() + "Distinct";
|
||||
@ -245,6 +251,21 @@ public:
|
||||
return nested_func->isState();
|
||||
}
|
||||
|
||||
bool isVersioned() const override
|
||||
{
|
||||
return nested_func->isVersioned();
|
||||
}
|
||||
|
||||
size_t getVersionFromRevision(size_t revision) const override
|
||||
{
|
||||
return nested_func->getVersionFromRevision(revision);
|
||||
}
|
||||
|
||||
size_t getDefaultVersion() const override
|
||||
{
|
||||
return nested_func->getDefaultVersion();
|
||||
}
|
||||
|
||||
AggregateFunctionPtr getNestedFunction() const override { return nested_func; }
|
||||
};
|
||||
|
||||
|
@ -66,6 +66,7 @@ private:
|
||||
if (old_size < new_size)
|
||||
{
|
||||
char * old_state = state.array_of_aggregate_datas;
|
||||
|
||||
char * new_state = arena.alignedAlloc(
|
||||
new_size * nested_size_of_data,
|
||||
nested_func->alignOfData());
|
||||
@ -134,23 +135,43 @@ public:
|
||||
return nested_func->isVersioned();
|
||||
}
|
||||
|
||||
size_t getVersionFromRevision(size_t revision) const override
|
||||
{
|
||||
return nested_func->getVersionFromRevision(revision);
|
||||
}
|
||||
|
||||
size_t getDefaultVersion() const override
|
||||
{
|
||||
return nested_func->getDefaultVersion();
|
||||
}
|
||||
|
||||
void destroy(AggregateDataPtr __restrict place) const noexcept override
|
||||
template <bool up_to_state>
|
||||
void destroyImpl(AggregateDataPtr __restrict place) const noexcept
|
||||
{
|
||||
AggregateFunctionForEachData & state = data(place);
|
||||
|
||||
char * nested_state = state.array_of_aggregate_datas;
|
||||
for (size_t i = 0; i < state.dynamic_array_size; ++i)
|
||||
{
|
||||
nested_func->destroy(nested_state);
|
||||
if constexpr (up_to_state)
|
||||
nested_func->destroyUpToState(nested_state);
|
||||
else
|
||||
nested_func->destroy(nested_state);
|
||||
|
||||
nested_state += nested_size_of_data;
|
||||
}
|
||||
}
|
||||
|
||||
void destroy(AggregateDataPtr __restrict place) const noexcept override
|
||||
{
|
||||
destroyImpl<false>(place);
|
||||
}
|
||||
|
||||
void destroyUpToState(AggregateDataPtr __restrict place) const noexcept override
|
||||
{
|
||||
destroyImpl<true>(place);
|
||||
}
|
||||
|
||||
bool hasTrivialDestructor() const override
|
||||
{
|
||||
return nested_func->hasTrivialDestructor();
|
||||
|
@ -71,6 +71,11 @@ public:
|
||||
return nested_func->isVersioned();
|
||||
}
|
||||
|
||||
size_t getVersionFromRevision(size_t revision) const override
|
||||
{
|
||||
return nested_func->getVersionFromRevision(revision);
|
||||
}
|
||||
|
||||
size_t getDefaultVersion() const override
|
||||
{
|
||||
return nested_func->getDefaultVersion();
|
||||
@ -86,6 +91,11 @@ public:
|
||||
nested_func->destroy(place);
|
||||
}
|
||||
|
||||
void destroyUpToState(AggregateDataPtr __restrict place) const noexcept override
|
||||
{
|
||||
nested_func->destroyUpToState(place);
|
||||
}
|
||||
|
||||
bool hasTrivialDestructor() const override
|
||||
{
|
||||
return nested_func->hasTrivialDestructor();
|
||||
|
@ -84,6 +84,26 @@ private:
|
||||
using Base = IAggregateFunctionDataHelper<Data, AggregateFunctionMap<KeyType>>;
|
||||
|
||||
public:
|
||||
bool isState() const override
|
||||
{
|
||||
return nested_func->isState();
|
||||
}
|
||||
|
||||
bool isVersioned() const override
|
||||
{
|
||||
return nested_func->isVersioned();
|
||||
}
|
||||
|
||||
size_t getVersionFromRevision(size_t revision) const override
|
||||
{
|
||||
return nested_func->getVersionFromRevision(revision);
|
||||
}
|
||||
|
||||
size_t getDefaultVersion() const override
|
||||
{
|
||||
return nested_func->getDefaultVersion();
|
||||
}
|
||||
|
||||
AggregateFunctionMap(AggregateFunctionPtr nested, const DataTypes & types) : Base(types, nested->getParameters()), nested_func(nested)
|
||||
{
|
||||
if (types.empty())
|
||||
@ -187,6 +207,32 @@ public:
|
||||
}
|
||||
}
|
||||
|
||||
template <bool up_to_state>
|
||||
void destroyImpl(AggregateDataPtr __restrict place) const noexcept
|
||||
{
|
||||
AggregateFunctionMapCombinatorData<KeyType> & state = Base::data(place);
|
||||
|
||||
for (const auto & [key, nested_place] : state.merged_maps)
|
||||
{
|
||||
if constexpr (up_to_state)
|
||||
nested_func->destroyUpToState(nested_place);
|
||||
else
|
||||
nested_func->destroy(nested_place);
|
||||
}
|
||||
|
||||
state.~Data();
|
||||
}
|
||||
|
||||
void destroy(AggregateDataPtr __restrict place) const noexcept override
|
||||
{
|
||||
destroyImpl<false>(place);
|
||||
}
|
||||
|
||||
void destroyUpToState(AggregateDataPtr __restrict place) const noexcept override
|
||||
{
|
||||
destroyImpl<true>(place);
|
||||
}
|
||||
|
||||
void serialize(ConstAggregateDataPtr __restrict place, WriteBuffer & buf, std::optional<size_t> /* version */) const override
|
||||
{
|
||||
auto & merged_maps = this->data(place).merged_maps;
|
||||
|
@ -80,6 +80,11 @@ public:
|
||||
nested_func->destroy(place);
|
||||
}
|
||||
|
||||
void destroyUpToState(AggregateDataPtr __restrict place) const noexcept override
|
||||
{
|
||||
nested_func->destroyUpToState(place);
|
||||
}
|
||||
|
||||
bool hasTrivialDestructor() const override
|
||||
{
|
||||
return nested_func->hasTrivialDestructor();
|
||||
@ -126,6 +131,11 @@ public:
|
||||
}
|
||||
|
||||
AggregateFunctionPtr getNestedFunction() const override { return nested_func; }
|
||||
|
||||
bool isState() const override
|
||||
{
|
||||
return nested_func->isState();
|
||||
}
|
||||
};
|
||||
|
||||
}
|
||||
|
@ -114,6 +114,11 @@ public:
|
||||
nested_function->destroy(nestedPlace(place));
|
||||
}
|
||||
|
||||
void destroyUpToState(AggregateDataPtr __restrict place) const noexcept override
|
||||
{
|
||||
nested_function->destroyUpToState(nestedPlace(place));
|
||||
}
|
||||
|
||||
bool hasTrivialDestructor() const override
|
||||
{
|
||||
return nested_function->hasTrivialDestructor();
|
||||
@ -189,6 +194,21 @@ public:
|
||||
return nested_function->isState();
|
||||
}
|
||||
|
||||
bool isVersioned() const override
|
||||
{
|
||||
return nested_function->isVersioned();
|
||||
}
|
||||
|
||||
size_t getVersionFromRevision(size_t revision) const override
|
||||
{
|
||||
return nested_function->getVersionFromRevision(revision);
|
||||
}
|
||||
|
||||
size_t getDefaultVersion() const override
|
||||
{
|
||||
return nested_function->getDefaultVersion();
|
||||
}
|
||||
|
||||
AggregateFunctionPtr getNestedFunction() const override { return nested_function; }
|
||||
|
||||
#if USE_EMBEDDED_COMPILER
|
||||
|
@ -98,6 +98,11 @@ public:
|
||||
nested_function->destroy(place);
|
||||
}
|
||||
|
||||
void destroyUpToState(AggregateDataPtr __restrict place) const noexcept override
|
||||
{
|
||||
nested_function->destroyUpToState(place);
|
||||
}
|
||||
|
||||
void add(
|
||||
AggregateDataPtr __restrict place,
|
||||
const IColumn ** columns,
|
||||
|
@ -91,6 +91,21 @@ public:
|
||||
return nested_function->isState();
|
||||
}
|
||||
|
||||
bool isVersioned() const override
|
||||
{
|
||||
return nested_function->isVersioned();
|
||||
}
|
||||
|
||||
size_t getVersionFromRevision(size_t revision) const override
|
||||
{
|
||||
return nested_function->getVersionFromRevision(revision);
|
||||
}
|
||||
|
||||
size_t getDefaultVersion() const override
|
||||
{
|
||||
return nested_function->getDefaultVersion();
|
||||
}
|
||||
|
||||
bool allocatesMemoryInArena() const override
|
||||
{
|
||||
return nested_function->allocatesMemoryInArena();
|
||||
@ -134,6 +149,12 @@ public:
|
||||
nested_function->destroy(place + i * size_of_data);
|
||||
}
|
||||
|
||||
void destroyUpToState(AggregateDataPtr __restrict place) const noexcept override
|
||||
{
|
||||
for (size_t i = 0; i < total; ++i)
|
||||
nested_function->destroyUpToState(place + i * size_of_data);
|
||||
}
|
||||
|
||||
void add(AggregateDataPtr __restrict place, const IColumn ** columns, size_t row_num, Arena * arena) const override
|
||||
{
|
||||
Key key;
|
||||
|
@ -56,10 +56,22 @@ public:
|
||||
return nested_func->getDefaultVersion();
|
||||
}
|
||||
|
||||
bool isState() const override
|
||||
{
|
||||
return nested_func->isState();
|
||||
}
|
||||
|
||||
size_t getVersionFromRevision(size_t revision) const override
|
||||
{
|
||||
return nested_func->getVersionFromRevision(revision);
|
||||
}
|
||||
|
||||
void create(AggregateDataPtr __restrict place) const override { nested_func->create(place); }
|
||||
|
||||
void destroy(AggregateDataPtr __restrict place) const noexcept override { nested_func->destroy(place); }
|
||||
|
||||
void destroyUpToState(AggregateDataPtr __restrict place) const noexcept override { nested_func->destroyUpToState(place); }
|
||||
|
||||
bool hasTrivialDestructor() const override { return nested_func->hasTrivialDestructor(); }
|
||||
|
||||
size_t sizeOfData() const override { return nested_func->sizeOfData(); }
|
||||
|
@ -69,6 +69,8 @@ public:
|
||||
nested_func->destroy(place);
|
||||
}
|
||||
|
||||
void destroyUpToState(AggregateDataPtr __restrict) const noexcept override {}
|
||||
|
||||
bool hasTrivialDestructor() const override
|
||||
{
|
||||
return nested_func->hasTrivialDestructor();
|
||||
|
@ -113,6 +113,17 @@ public:
|
||||
/// Delete data for aggregation.
|
||||
virtual void destroy(AggregateDataPtr __restrict place) const noexcept = 0;
|
||||
|
||||
/// Delete all combinator states that were used after combinator -State.
|
||||
/// For example for uniqArrayStateForEachMap(...) it will destroy
|
||||
/// states that were created by combinators Map and ForEach.
|
||||
/// It's needed because ColumnAggregateFunction in the result will be
|
||||
/// responsible only for destruction of states that were created
|
||||
/// by aggregate function and all combinators before -State combinator.
|
||||
virtual void destroyUpToState(AggregateDataPtr __restrict place) const noexcept
|
||||
{
|
||||
destroy(place);
|
||||
}
|
||||
|
||||
/// It is not necessary to delete data.
|
||||
virtual bool hasTrivialDestructor() const = 0;
|
||||
|
||||
@ -277,8 +288,7 @@ public:
|
||||
Arena * arena) const = 0;
|
||||
|
||||
/** Insert result of aggregate function into result column with batch size.
|
||||
* If destroy_place_after_insert is true. Then implementation of this method
|
||||
* must destroy aggregate place if insert state into result column was successful.
|
||||
* The implementation of this method will destroy aggregate place up to -State if insert state into result column was successful.
|
||||
* All places that were not inserted must be destroyed if there was exception during insert into result column.
|
||||
*/
|
||||
virtual void insertResultIntoBatch(
|
||||
@ -287,8 +297,7 @@ public:
|
||||
AggregateDataPtr * places,
|
||||
size_t place_offset,
|
||||
IColumn & to,
|
||||
Arena * arena,
|
||||
bool destroy_place_after_insert) const = 0;
|
||||
Arena * arena) const = 0;
|
||||
|
||||
/** Destroy batch of aggregate places.
|
||||
*/
|
||||
@ -612,8 +621,7 @@ public:
|
||||
AggregateDataPtr * places,
|
||||
size_t place_offset,
|
||||
IColumn & to,
|
||||
Arena * arena,
|
||||
bool destroy_place_after_insert) const override
|
||||
Arena * arena) const override
|
||||
{
|
||||
size_t batch_index = row_begin;
|
||||
|
||||
@ -622,9 +630,9 @@ public:
|
||||
for (; batch_index < row_end; ++batch_index)
|
||||
{
|
||||
static_cast<const Derived *>(this)->insertResultInto(places[batch_index] + place_offset, to, arena);
|
||||
|
||||
if (destroy_place_after_insert)
|
||||
static_cast<const Derived *>(this)->destroy(places[batch_index] + place_offset);
|
||||
/// For State AggregateFunction ownership of aggregate place is passed to result column after insert,
|
||||
/// so we need to destroy all states up to state of -State combinator.
|
||||
static_cast<const Derived *>(this)->destroyUpToState(places[batch_index] + place_offset);
|
||||
}
|
||||
}
|
||||
catch (...)
|
||||
|
@ -137,9 +137,41 @@ Field QueryFuzzer::fuzzField(Field field)
|
||||
break;
|
||||
}
|
||||
}
|
||||
else if (type == Field::Types::Array || type == Field::Types::Tuple)
|
||||
else if (type == Field::Types::Array)
|
||||
{
|
||||
auto & arr = field.reinterpret<FieldVector>();
|
||||
auto & arr = field.get<Array>();
|
||||
|
||||
if (fuzz_rand() % 5 == 0 && !arr.empty())
|
||||
{
|
||||
size_t pos = fuzz_rand() % arr.size();
|
||||
arr.erase(arr.begin() + pos);
|
||||
std::cerr << "erased\n";
|
||||
}
|
||||
|
||||
if (fuzz_rand() % 5 == 0)
|
||||
{
|
||||
if (!arr.empty())
|
||||
{
|
||||
size_t pos = fuzz_rand() % arr.size();
|
||||
arr.insert(arr.begin() + pos, fuzzField(arr[pos]));
|
||||
std::cerr << fmt::format("inserted (pos {})\n", pos);
|
||||
}
|
||||
else
|
||||
{
|
||||
arr.insert(arr.begin(), getRandomField(0));
|
||||
std::cerr << "inserted (0)\n";
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
for (auto & element : arr)
|
||||
{
|
||||
element = fuzzField(element);
|
||||
}
|
||||
}
|
||||
else if (type == Field::Types::Tuple)
|
||||
{
|
||||
auto & arr = field.get<Tuple>();
|
||||
|
||||
if (fuzz_rand() % 5 == 0 && !arr.empty())
|
||||
{
|
||||
|
@ -162,7 +162,7 @@ MutableColumnPtr ColumnAggregateFunction::convertToValues(MutableColumnPtr colum
|
||||
};
|
||||
|
||||
callback(res);
|
||||
res->forEachSubcolumn(callback);
|
||||
res->forEachSubcolumnRecursively(callback);
|
||||
|
||||
for (auto * val : data)
|
||||
func->insertResultInto(val, *res, &column_aggregate_func.createOrGetArena());
|
||||
|
@ -157,6 +157,14 @@ public:
|
||||
callback(data);
|
||||
}
|
||||
|
||||
void forEachSubcolumnRecursively(ColumnCallback callback) override
|
||||
{
|
||||
callback(offsets);
|
||||
offsets->forEachSubcolumnRecursively(callback);
|
||||
callback(data);
|
||||
data->forEachSubcolumnRecursively(callback);
|
||||
}
|
||||
|
||||
bool structureEquals(const IColumn & rhs) const override
|
||||
{
|
||||
if (const auto * rhs_concrete = typeid_cast<const ColumnArray *>(&rhs))
|
||||
|
@ -235,6 +235,12 @@ public:
|
||||
callback(data);
|
||||
}
|
||||
|
||||
void forEachSubcolumnRecursively(ColumnCallback callback) override
|
||||
{
|
||||
callback(data);
|
||||
data->forEachSubcolumnRecursively(callback);
|
||||
}
|
||||
|
||||
bool structureEquals(const IColumn & rhs) const override
|
||||
{
|
||||
if (const auto * rhs_concrete = typeid_cast<const ColumnConst *>(&rhs))
|
||||
|
@ -173,6 +173,19 @@ public:
|
||||
callback(dictionary.getColumnUniquePtr());
|
||||
}
|
||||
|
||||
void forEachSubcolumnRecursively(ColumnCallback callback) override
|
||||
{
|
||||
callback(idx.getPositionsPtr());
|
||||
idx.getPositionsPtr()->forEachSubcolumnRecursively(callback);
|
||||
|
||||
/// Column doesn't own dictionary if it's shared.
|
||||
if (!dictionary.isShared())
|
||||
{
|
||||
callback(dictionary.getColumnUniquePtr());
|
||||
dictionary.getColumnUniquePtr()->forEachSubcolumnRecursively(callback);
|
||||
}
|
||||
}
|
||||
|
||||
bool structureEquals(const IColumn & rhs) const override
|
||||
{
|
||||
if (const auto * rhs_low_cardinality = typeid_cast<const ColumnLowCardinality *>(&rhs))
|
||||
|
@ -278,6 +278,12 @@ void ColumnMap::forEachSubcolumn(ColumnCallback callback)
|
||||
callback(nested);
|
||||
}
|
||||
|
||||
void ColumnMap::forEachSubcolumnRecursively(ColumnCallback callback)
|
||||
{
|
||||
callback(nested);
|
||||
nested->forEachSubcolumnRecursively(callback);
|
||||
}
|
||||
|
||||
bool ColumnMap::structureEquals(const IColumn & rhs) const
|
||||
{
|
||||
if (const auto * rhs_map = typeid_cast<const ColumnMap *>(&rhs))
|
||||
|
@ -89,6 +89,7 @@ public:
|
||||
size_t allocatedBytes() const override;
|
||||
void protect() override;
|
||||
void forEachSubcolumn(ColumnCallback callback) override;
|
||||
void forEachSubcolumnRecursively(ColumnCallback callback) override;
|
||||
bool structureEquals(const IColumn & rhs) const override;
|
||||
double getRatioOfDefaultRows(double sample_ratio) const override;
|
||||
void getIndicesOfNonDefaultRows(Offsets & indices, size_t from, size_t limit) const override;
|
||||
|
@ -136,6 +136,14 @@ public:
|
||||
callback(null_map);
|
||||
}
|
||||
|
||||
void forEachSubcolumnRecursively(ColumnCallback callback) override
|
||||
{
|
||||
callback(nested_column);
|
||||
nested_column->forEachSubcolumnRecursively(callback);
|
||||
callback(null_map);
|
||||
null_map->forEachSubcolumnRecursively(callback);
|
||||
}
|
||||
|
||||
bool structureEquals(const IColumn & rhs) const override
|
||||
{
|
||||
if (const auto * rhs_nullable = typeid_cast<const ColumnNullable *>(&rhs))
|
||||
|
@ -671,6 +671,18 @@ void ColumnObject::forEachSubcolumn(ColumnCallback callback)
|
||||
callback(part);
|
||||
}
|
||||
|
||||
void ColumnObject::forEachSubcolumnRecursively(ColumnCallback callback)
|
||||
{
|
||||
for (auto & entry : subcolumns)
|
||||
{
|
||||
for (auto & part : entry->data.data)
|
||||
{
|
||||
callback(part);
|
||||
part->forEachSubcolumnRecursively(callback);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void ColumnObject::insert(const Field & field)
|
||||
{
|
||||
const auto & object = field.get<const Object &>();
|
||||
|
@ -211,6 +211,7 @@ public:
|
||||
size_t byteSize() const override;
|
||||
size_t allocatedBytes() const override;
|
||||
void forEachSubcolumn(ColumnCallback callback) override;
|
||||
void forEachSubcolumnRecursively(ColumnCallback callback) override;
|
||||
void insert(const Field & field) override;
|
||||
void insertDefault() override;
|
||||
void insertFrom(const IColumn & src, size_t n) override;
|
||||
|
@ -750,6 +750,14 @@ void ColumnSparse::forEachSubcolumn(ColumnCallback callback)
|
||||
callback(offsets);
|
||||
}
|
||||
|
||||
void ColumnSparse::forEachSubcolumnRecursively(ColumnCallback callback)
|
||||
{
|
||||
callback(values);
|
||||
values->forEachSubcolumnRecursively(callback);
|
||||
callback(offsets);
|
||||
offsets->forEachSubcolumnRecursively(callback);
|
||||
}
|
||||
|
||||
const IColumn::Offsets & ColumnSparse::getOffsetsData() const
|
||||
{
|
||||
return assert_cast<const ColumnUInt64 &>(*offsets).getData();
|
||||
|
@ -140,6 +140,7 @@ public:
|
||||
ColumnPtr compress() const override;
|
||||
|
||||
void forEachSubcolumn(ColumnCallback callback) override;
|
||||
void forEachSubcolumnRecursively(ColumnCallback callback) override;
|
||||
|
||||
bool structureEquals(const IColumn & rhs) const override;
|
||||
|
||||
|
@ -501,6 +501,15 @@ void ColumnTuple::forEachSubcolumn(ColumnCallback callback)
|
||||
callback(column);
|
||||
}
|
||||
|
||||
void ColumnTuple::forEachSubcolumnRecursively(ColumnCallback callback)
|
||||
{
|
||||
for (auto & column : columns)
|
||||
{
|
||||
callback(column);
|
||||
column->forEachSubcolumnRecursively(callback);
|
||||
}
|
||||
}
|
||||
|
||||
bool ColumnTuple::structureEquals(const IColumn & rhs) const
|
||||
{
|
||||
if (const auto * rhs_tuple = typeid_cast<const ColumnTuple *>(&rhs))
|
||||
|
@ -97,6 +97,7 @@ public:
|
||||
size_t allocatedBytes() const override;
|
||||
void protect() override;
|
||||
void forEachSubcolumn(ColumnCallback callback) override;
|
||||
void forEachSubcolumnRecursively(ColumnCallback callback) override;
|
||||
bool structureEquals(const IColumn & rhs) const override;
|
||||
bool isCollationSupported() const override;
|
||||
ColumnPtr compress() const override;
|
||||
|
@ -1,4 +1,5 @@
|
||||
#pragma once
|
||||
|
||||
#include <Columns/IColumnUnique.h>
|
||||
#include <Columns/IColumnImpl.h>
|
||||
#include <Columns/ReverseIndex.h>
|
||||
@ -7,16 +8,17 @@
|
||||
#include <Columns/ColumnNullable.h>
|
||||
#include <Columns/ColumnString.h>
|
||||
#include <Columns/ColumnFixedString.h>
|
||||
#include <Columns/ColumnConst.h>
|
||||
|
||||
#include <DataTypes/DataTypeNullable.h>
|
||||
#include <DataTypes/NumberTraits.h>
|
||||
|
||||
#include <Common/typeid_cast.h>
|
||||
#include <Common/assert_cast.h>
|
||||
#include <base/range.h>
|
||||
#include <Common/FieldVisitors.h>
|
||||
|
||||
#include <base/range.h>
|
||||
#include <base/unaligned.h>
|
||||
#include "Columns/ColumnConst.h"
|
||||
|
||||
|
||||
namespace DB
|
||||
@ -111,6 +113,15 @@ public:
|
||||
nested_column_nullable = ColumnNullable::create(column_holder, nested_null_mask);
|
||||
}
|
||||
|
||||
void forEachSubcolumnRecursively(IColumn::ColumnCallback callback) override
|
||||
{
|
||||
callback(column_holder);
|
||||
column_holder->forEachSubcolumnRecursively(callback);
|
||||
reverse_index.setColumn(getRawColumnPtr());
|
||||
if (is_nullable)
|
||||
nested_column_nullable = ColumnNullable::create(column_holder, nested_null_mask);
|
||||
}
|
||||
|
||||
bool structureEquals(const IColumn & rhs) const override
|
||||
{
|
||||
if (auto rhs_concrete = typeid_cast<const ColumnUnique *>(&rhs))
|
||||
@ -305,17 +316,52 @@ size_t ColumnUnique<ColumnType>::getNullValueIndex() const
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
||||
namespace
|
||||
{
|
||||
class FieldVisitorGetData : public StaticVisitor<>
|
||||
{
|
||||
public:
|
||||
StringRef res;
|
||||
|
||||
[[noreturn]] static void throwUnsupported()
|
||||
{
|
||||
throw Exception("Unsupported field type", ErrorCodes::LOGICAL_ERROR);
|
||||
}
|
||||
|
||||
[[noreturn]] void operator() (const Null &) { throwUnsupported(); }
|
||||
[[noreturn]] void operator() (const Array &) { throwUnsupported(); }
|
||||
[[noreturn]] void operator() (const Tuple &) { throwUnsupported(); }
|
||||
[[noreturn]] void operator() (const Map &) { throwUnsupported(); }
|
||||
[[noreturn]] void operator() (const Object &) { throwUnsupported(); }
|
||||
[[noreturn]] void operator() (const AggregateFunctionStateData &) { throwUnsupported(); }
|
||||
void operator() (const String & x) { res = {x.data(), x.size()}; }
|
||||
void operator() (const UInt64 & x) { res = {reinterpret_cast<const char *>(&x), sizeof(x)}; }
|
||||
void operator() (const UInt128 & x) { res = {reinterpret_cast<const char *>(&x), sizeof(x)}; }
|
||||
void operator() (const UInt256 & x) { res = {reinterpret_cast<const char *>(&x), sizeof(x)}; }
|
||||
void operator() (const Int64 & x) { res = {reinterpret_cast<const char *>(&x), sizeof(x)}; }
|
||||
void operator() (const Int128 & x) { res = {reinterpret_cast<const char *>(&x), sizeof(x)}; }
|
||||
void operator() (const Int256 & x) { res = {reinterpret_cast<const char *>(&x), sizeof(x)}; }
|
||||
void operator() (const UUID & x) { res = {reinterpret_cast<const char *>(&x), sizeof(x)}; }
|
||||
void operator() (const Float64 & x) { res = {reinterpret_cast<const char *>(&x), sizeof(x)}; }
|
||||
void operator() (const DecimalField<Decimal32> & x) { res = {reinterpret_cast<const char *>(&x), sizeof(x)}; }
|
||||
void operator() (const DecimalField<Decimal64> & x) { res = {reinterpret_cast<const char *>(&x), sizeof(x)}; }
|
||||
void operator() (const DecimalField<Decimal128> & x) { res = {reinterpret_cast<const char *>(&x), sizeof(x)}; }
|
||||
void operator() (const DecimalField<Decimal256> & x) { res = {reinterpret_cast<const char *>(&x), sizeof(x)}; }
|
||||
void operator() (const bool & x) { res = {reinterpret_cast<const char *>(&x), sizeof(x)}; }
|
||||
};
|
||||
}
|
||||
|
||||
|
||||
template <typename ColumnType>
|
||||
size_t ColumnUnique<ColumnType>::uniqueInsert(const Field & x)
|
||||
{
|
||||
if (x.isNull())
|
||||
return getNullValueIndex();
|
||||
|
||||
if (valuesHaveFixedSize())
|
||||
return uniqueInsertData(&x.reinterpret<char>(), size_of_value_if_fixed);
|
||||
|
||||
const auto & val = x.get<String>();
|
||||
return uniqueInsertData(val.data(), val.size());
|
||||
FieldVisitorGetData visitor;
|
||||
applyVisitor(visitor, x);
|
||||
return uniqueInsertData(visitor.res.data, visitor.res.size);
|
||||
}
|
||||
|
||||
template <typename ColumnType>
|
||||
|
@ -414,6 +414,9 @@ public:
|
||||
using ColumnCallback = std::function<void(WrappedPtr&)>;
|
||||
virtual void forEachSubcolumn(ColumnCallback) {}
|
||||
|
||||
/// Similar to forEachSubcolumn but it also do recursive calls.
|
||||
virtual void forEachSubcolumnRecursively(ColumnCallback) {}
|
||||
|
||||
/// Columns have equal structure.
|
||||
/// If true - you can use "compareAt", "insertFrom", etc. methods.
|
||||
[[nodiscard]] virtual bool structureEquals(const IColumn &) const
|
||||
|
@ -94,21 +94,7 @@ public:
|
||||
T operator() (const DecimalField<U> & x) const
|
||||
{
|
||||
if constexpr (std::is_floating_point_v<T>)
|
||||
return x.getValue(). template convertTo<T>() / x.getScaleMultiplier(). template convertTo<T>();
|
||||
else if constexpr (std::is_same_v<T, UInt128>)
|
||||
{
|
||||
if constexpr (sizeof(U) < 16)
|
||||
{
|
||||
return UInt128(0, (x.getValue() / x.getScaleMultiplier()).value);
|
||||
}
|
||||
else if constexpr (sizeof(U) == 16)
|
||||
{
|
||||
auto tmp = (x.getValue() / x.getScaleMultiplier()).value;
|
||||
return UInt128(tmp >> 64, UInt64(tmp));
|
||||
}
|
||||
else
|
||||
throw Exception("No conversion to old UInt128 from " + demangle(typeid(U).name()), ErrorCodes::NOT_IMPLEMENTED);
|
||||
}
|
||||
return x.getValue().template convertTo<T>() / x.getScaleMultiplier().template convertTo<T>();
|
||||
else
|
||||
return (x.getValue() / x.getScaleMultiplier()). template convertTo<T>();
|
||||
}
|
||||
@ -134,4 +120,3 @@ public:
|
||||
};
|
||||
|
||||
}
|
||||
|
||||
|
@ -15,7 +15,7 @@ FieldVisitorSum::FieldVisitorSum(const Field & rhs_) : rhs(rhs_) {}
|
||||
bool FieldVisitorSum::operator() (Int64 & x) const { return this->operator()(reinterpret_cast<UInt64 &>(x)); }
|
||||
bool FieldVisitorSum::operator() (UInt64 & x) const
|
||||
{
|
||||
x += rhs.reinterpret<UInt64>();
|
||||
x += applyVisitor(FieldVisitorConvertToNumber<UInt64>(), rhs);
|
||||
return x != 0;
|
||||
}
|
||||
|
||||
|
@ -1,6 +1,7 @@
|
||||
#pragma once
|
||||
|
||||
#include <Common/FieldVisitors.h>
|
||||
#include <Common/FieldVisitorConvertToNumber.h>
|
||||
|
||||
|
||||
namespace DB
|
||||
@ -41,7 +42,7 @@ public:
|
||||
requires is_big_int_v<T>
|
||||
bool operator() (T & x) const
|
||||
{
|
||||
x += rhs.reinterpret<T>();
|
||||
x += applyVisitor(FieldVisitorConvertToNumber<T>(), rhs);
|
||||
return x != T(0);
|
||||
}
|
||||
};
|
||||
|
@ -115,7 +115,13 @@ protected:
|
||||
}
|
||||
|
||||
/// Minimum amount of memory to allocate for num_elements, including padding.
|
||||
static size_t minimum_memory_for_elements(size_t num_elements) { return byte_size(num_elements) + pad_right + pad_left; } /// NOLINT
|
||||
static size_t minimum_memory_for_elements(size_t num_elements)
|
||||
{
|
||||
size_t amount;
|
||||
if (__builtin_add_overflow(byte_size(num_elements), pad_left + pad_right, &amount))
|
||||
throw Exception("Amount of memory requested to allocate is more than allowed", ErrorCodes::CANNOT_ALLOCATE_MEMORY);
|
||||
return amount;
|
||||
}
|
||||
|
||||
void alloc_for_num_elements(size_t num_elements) /// NOLINT
|
||||
{
|
||||
|
@ -410,6 +410,17 @@ The server successfully detected this situation and will download merged part fr
|
||||
M(OverflowBreak, "Number of times, data processing was cancelled by query complexity limitation with setting '*_overflow_mode' = 'break' and the result is incomplete.") \
|
||||
M(OverflowThrow, "Number of times, data processing was cancelled by query complexity limitation with setting '*_overflow_mode' = 'throw' and exception was thrown.") \
|
||||
M(OverflowAny, "Number of times approximate GROUP BY was in effect: when aggregation was performed only on top of first 'max_rows_to_group_by' unique keys and other keys were ignored due to 'group_by_overflow_mode' = 'any'.") \
|
||||
M(DeleteS3Objects, "Number of s3 API DeleteObjects be called") \
|
||||
M(CopyS3Object, "Number of s3 API CopyObject be called") \
|
||||
M(ListS3Objects, "Number of s3 API ListObjects be called") \
|
||||
M(HeadS3Object, "Number of s3 API HeadObject be called") \
|
||||
M(CreateS3MultipartUpload, "Number of s3 API CreateMultipartUpload be called") \
|
||||
M(UploadS3PartCopy, "Number of s3 API UploadPartCopy be called") \
|
||||
M(UploadS3Part, "Number of s3 API UploadS3Part be called") \
|
||||
M(AbortS3MultipartUpload, "Number of s3 API AbortMultipartUpload be called") \
|
||||
M(CompleteS3MultipartUpload, "Number of s3 API CompleteS3MultipartUpload be called") \
|
||||
M(PutS3ObjectRequest, "Number of s3 API PutS3ObjectRequest be called") \
|
||||
M(GetS3ObjectRequest, "Number of s3 API GetS3ObjectRequest be called")
|
||||
|
||||
namespace ProfileEvents
|
||||
{
|
||||
|
@ -370,7 +370,7 @@ void KeeperDispatcher::shutdown()
|
||||
/// Clear all registered sessions
|
||||
std::lock_guard lock(session_to_response_callback_mutex);
|
||||
|
||||
if (hasLeader())
|
||||
if (server && hasLeader())
|
||||
{
|
||||
close_requests.reserve(session_to_response_callback.size());
|
||||
// send to leader CLOSE requests for active sessions
|
||||
@ -394,7 +394,7 @@ void KeeperDispatcher::shutdown()
|
||||
}
|
||||
|
||||
// if there is no leader, there is no reason to do CLOSE because it's a write request
|
||||
if (hasLeader() && !close_requests.empty())
|
||||
if (server && hasLeader() && !close_requests.empty())
|
||||
{
|
||||
LOG_INFO(log, "Trying to close {} session(s)", close_requests.size());
|
||||
const auto raft_result = server->putRequestBatch(close_requests);
|
||||
|
@ -520,7 +520,7 @@ bool KeeperServer::isFollower() const
|
||||
|
||||
bool KeeperServer::isLeaderAlive() const
|
||||
{
|
||||
return raft_instance->is_leader_alive();
|
||||
return raft_instance && raft_instance->is_leader_alive();
|
||||
}
|
||||
|
||||
/// TODO test whether taking failed peer in count
|
||||
|
@ -425,16 +425,6 @@ public:
|
||||
bool isNegativeInfinity() const { return which == Types::Null && get<Null>().isNegativeInfinity(); }
|
||||
bool isPositiveInfinity() const { return which == Types::Null && get<Null>().isPositiveInfinity(); }
|
||||
|
||||
template <typename T>
|
||||
T & reinterpret();
|
||||
|
||||
template <typename T>
|
||||
const T & reinterpret() const
|
||||
{
|
||||
auto * mutable_this = const_cast<std::decay_t<decltype(*this)> *>(this);
|
||||
return mutable_this->reinterpret<T>();
|
||||
}
|
||||
|
||||
template <typename T> bool tryGet(T & result)
|
||||
{
|
||||
const Types::Which requested = TypeToEnum<std::decay_t<T>>::value;
|
||||
@ -552,7 +542,7 @@ public:
|
||||
case Types::Float64:
|
||||
{
|
||||
// Compare as UInt64 so that NaNs compare as equal.
|
||||
return reinterpret<UInt64>() == rhs.reinterpret<UInt64>();
|
||||
return std::bit_cast<UInt64>(get<Float64>()) == std::bit_cast<UInt64>(rhs.get<Float64>());
|
||||
}
|
||||
case Types::UUID: return get<UUID>() == rhs.get<UUID>();
|
||||
case Types::String: return get<String>() == rhs.get<String>();
|
||||
@ -843,30 +833,6 @@ auto & Field::safeGet()
|
||||
}
|
||||
|
||||
|
||||
template <typename T>
|
||||
T & Field::reinterpret()
|
||||
{
|
||||
assert(which != Types::String); // See specialization for char
|
||||
using ValueType = std::decay_t<T>;
|
||||
ValueType * MAY_ALIAS ptr = reinterpret_cast<ValueType *>(&storage);
|
||||
return *ptr;
|
||||
}
|
||||
|
||||
// Specialize reinterpreting to char (used in ColumnUnique) to make sure Strings are reinterpreted correctly
|
||||
// inline to avoid multiple definitions
|
||||
template <>
|
||||
inline char & Field::reinterpret<char>()
|
||||
{
|
||||
if (which == Types::String)
|
||||
{
|
||||
// For String we want to return a pointer to the data, not the start of the class
|
||||
// as the layout of std::string depends on the STD version and options
|
||||
char * ptr = reinterpret_cast<String *>(&storage)->data();
|
||||
return *ptr;
|
||||
}
|
||||
return *reinterpret_cast<char *>(&storage);
|
||||
}
|
||||
|
||||
template <typename T>
|
||||
Field::Field(T && rhs, enable_if_not_field_or_bool_or_stringlike_t<T>) //-V730
|
||||
{
|
||||
|
@ -20,8 +20,13 @@ namespace ErrorCodes
|
||||
extern const int CANNOT_READ_ALL_DATA;
|
||||
extern const int CANNOT_READ_ARRAY_FROM_TEXT;
|
||||
extern const int LOGICAL_ERROR;
|
||||
extern const int TOO_LARGE_ARRAY_SIZE;
|
||||
}
|
||||
|
||||
static constexpr size_t MAX_ARRAY_SIZE = 1ULL << 30;
|
||||
static constexpr size_t MAX_ARRAYS_SIZE = 1ULL << 40;
|
||||
|
||||
|
||||
void SerializationArray::serializeBinary(const Field & field, WriteBuffer & ostr) const
|
||||
{
|
||||
const Array & a = field.get<const Array &>();
|
||||
@ -125,7 +130,12 @@ namespace
|
||||
{
|
||||
ColumnArray::Offset current_size = 0;
|
||||
readIntBinary(current_size, istr);
|
||||
current_offset += current_size;
|
||||
|
||||
if (unlikely(current_size > MAX_ARRAY_SIZE))
|
||||
throw Exception(ErrorCodes::TOO_LARGE_ARRAY_SIZE, "Array size is too large: {}", current_size);
|
||||
if (unlikely(__builtin_add_overflow(current_offset, current_size, ¤t_offset)))
|
||||
throw Exception(ErrorCodes::TOO_LARGE_ARRAY_SIZE, "Deserialization of array offsets will lead to overflow");
|
||||
|
||||
offset_values[i] = current_offset;
|
||||
++i;
|
||||
}
|
||||
@ -348,6 +358,9 @@ void SerializationArray::deserializeBinaryBulkWithMultipleStreams(
|
||||
throw Exception("Nested column is longer than last offset", ErrorCodes::LOGICAL_ERROR);
|
||||
size_t nested_limit = last_offset - nested_column->size();
|
||||
|
||||
if (unlikely(nested_limit > MAX_ARRAYS_SIZE))
|
||||
throw Exception(ErrorCodes::TOO_LARGE_ARRAY_SIZE, "Array sizes are too large: {}", nested_limit);
|
||||
|
||||
/// Adjust value size hint. Divide it to the average array size.
|
||||
settings.avg_value_size_hint = nested_limit ? settings.avg_value_size_hint / nested_limit * offset_values.size() : 0;
|
||||
|
||||
|
@ -24,6 +24,8 @@ namespace ErrorCodes
|
||||
extern const int TOO_LARGE_STRING_SIZE;
|
||||
}
|
||||
|
||||
static constexpr size_t MAX_STRINGS_SIZE = 1ULL << 30;
|
||||
|
||||
void SerializationFixedString::serializeBinary(const Field & field, WriteBuffer & ostr) const
|
||||
{
|
||||
const String & s = field.get<const String &>();
|
||||
@ -85,8 +87,17 @@ void SerializationFixedString::deserializeBinaryBulk(IColumn & column, ReadBuffe
|
||||
ColumnFixedString::Chars & data = typeid_cast<ColumnFixedString &>(column).getChars();
|
||||
|
||||
size_t initial_size = data.size();
|
||||
size_t max_bytes = limit * n;
|
||||
data.resize(initial_size + max_bytes);
|
||||
size_t max_bytes;
|
||||
size_t new_data_size;
|
||||
|
||||
if (unlikely(__builtin_mul_overflow(limit, n, &max_bytes)))
|
||||
throw Exception(ErrorCodes::TOO_LARGE_STRING_SIZE, "Deserializing FixedString will lead to overflow");
|
||||
if (unlikely(max_bytes > MAX_STRINGS_SIZE))
|
||||
throw Exception(ErrorCodes::TOO_LARGE_STRING_SIZE, "Too large sizes of FixedString to deserialize: {}", max_bytes);
|
||||
if (unlikely(__builtin_add_overflow(initial_size, max_bytes, &new_data_size)))
|
||||
throw Exception(ErrorCodes::TOO_LARGE_STRING_SIZE, "Deserializing FixedString will lead to overflow");
|
||||
|
||||
data.resize(new_data_size);
|
||||
size_t read_bytes = istr.readBig(reinterpret_cast<char *>(&data[initial_size]), max_bytes);
|
||||
|
||||
if (read_bytes % n != 0)
|
||||
|
@ -17,6 +17,7 @@
|
||||
#include <emmintrin.h>
|
||||
#endif
|
||||
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
|
@ -185,6 +185,7 @@ void DatabaseOnDisk::createTable(
|
||||
if (create.attach_short_syntax)
|
||||
{
|
||||
/// Metadata already exists, table was detached
|
||||
assert(fs::exists(getObjectMetadataPath(table_name)));
|
||||
removeDetachedPermanentlyFlag(local_context, table_name, table_metadata_path, true);
|
||||
attachTable(local_context, table_name, table, getTableDataPath(create));
|
||||
return;
|
||||
|
@ -1259,4 +1259,24 @@ void DatabaseReplicated::createTableRestoredFromBackup(
|
||||
}
|
||||
}
|
||||
|
||||
bool DatabaseReplicated::shouldReplicateQuery(const ContextPtr & query_context, const ASTPtr & query_ptr) const
|
||||
{
|
||||
if (query_context->getClientInfo().is_replicated_database_internal)
|
||||
return false;
|
||||
|
||||
/// Some ALTERs are not replicated on database level
|
||||
if (const auto * alter = query_ptr->as<const ASTAlterQuery>())
|
||||
{
|
||||
return !alter->isAttachAlter() && !alter->isFetchAlter() && !alter->isDropPartitionAlter();
|
||||
}
|
||||
|
||||
/// DROP DATABASE is not replicated
|
||||
if (const auto * drop = query_ptr->as<const ASTDropQuery>())
|
||||
{
|
||||
return drop->table.get();
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -46,7 +46,7 @@ public:
|
||||
|
||||
/// Try to execute DLL query on current host as initial query. If query is succeed,
|
||||
/// then it will be executed on all replicas.
|
||||
BlockIO tryEnqueueReplicatedDDL(const ASTPtr & query, ContextPtr query_context, bool internal = false);
|
||||
BlockIO tryEnqueueReplicatedDDL(const ASTPtr & query, ContextPtr query_context, bool internal) override;
|
||||
|
||||
bool hasReplicationThread() const override { return true; }
|
||||
|
||||
@ -75,6 +75,8 @@ public:
|
||||
std::vector<std::pair<ASTPtr, StoragePtr>> getTablesForBackup(const FilterByNameFunction & filter, const ContextPtr & local_context) const override;
|
||||
void createTableRestoredFromBackup(const ASTPtr & create_table_query, ContextMutablePtr local_context, std::shared_ptr<IRestoreCoordination> restore_coordination, UInt64 timeout_ms) override;
|
||||
|
||||
bool shouldReplicateQuery(const ContextPtr & query_context, const ASTPtr & query_ptr) const override;
|
||||
|
||||
friend struct DatabaseReplicatedTask;
|
||||
friend class DatabaseReplicatedDDLWorker;
|
||||
private:
|
||||
|
@ -8,6 +8,7 @@
|
||||
#include <base/types.h>
|
||||
#include <Common/Exception.h>
|
||||
#include <Common/ThreadPool.h>
|
||||
#include <QueryPipeline/BlockIO.h>
|
||||
|
||||
#include <ctime>
|
||||
#include <functional>
|
||||
@ -338,6 +339,13 @@ public:
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "Database engine {} does not run a replication thread!", getEngineName());
|
||||
}
|
||||
|
||||
virtual bool shouldReplicateQuery(const ContextPtr & /*query_context*/, const ASTPtr & /*query_ptr*/) const { return false; }
|
||||
|
||||
virtual BlockIO tryEnqueueReplicatedDDL(const ASTPtr & /*query*/, ContextPtr /*query_context*/, [[maybe_unused]] bool internal = false) /// NOLINT
|
||||
{
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "Database engine {} does not have replicated DDL queue", getEngineName());
|
||||
}
|
||||
|
||||
/// Returns CREATE TABLE queries and corresponding tables prepared for writing to a backup.
|
||||
virtual std::vector<std::pair<ASTPtr, StoragePtr>> getTablesForBackup(const FilterByNameFunction & filter, const ContextPtr & context) const;
|
||||
|
||||
|
@ -108,8 +108,19 @@ std::future<IAsynchronousReader::Result> ThreadPoolReader::submit(Request reques
|
||||
|
||||
if (has_pread_nowait_support.load(std::memory_order_relaxed))
|
||||
{
|
||||
/// It reports real time spent including the time spent while thread was preempted doing nothing.
|
||||
/// And it is Ok for the purpose of this watch (it is used to lower the number of threads to read from tables).
|
||||
/// Sometimes it is better to use taskstats::blkio_delay_total, but it is quite expensive to get it
|
||||
/// (TaskStatsInfoGetter has about 500K RPS).
|
||||
Stopwatch watch(CLOCK_MONOTONIC);
|
||||
|
||||
SCOPE_EXIT({
|
||||
watch.stop();
|
||||
|
||||
ProfileEvents::increment(ProfileEvents::ThreadPoolReaderPageCacheHitElapsedMicroseconds, watch.elapsedMicroseconds());
|
||||
ProfileEvents::increment(ProfileEvents::DiskReadElapsedMicroseconds, watch.elapsedMicroseconds());
|
||||
});
|
||||
|
||||
std::promise<Result> promise;
|
||||
std::future<Result> future = promise.get_future();
|
||||
|
||||
@ -135,11 +146,6 @@ std::future<IAsynchronousReader::Result> ThreadPoolReader::submit(Request reques
|
||||
{
|
||||
/// The file has ended.
|
||||
promise.set_value({0, 0});
|
||||
|
||||
watch.stop();
|
||||
ProfileEvents::increment(ProfileEvents::ThreadPoolReaderPageCacheHitElapsedMicroseconds, watch.elapsedMicroseconds());
|
||||
ProfileEvents::increment(ProfileEvents::DiskReadElapsedMicroseconds, watch.elapsedMicroseconds());
|
||||
|
||||
return future;
|
||||
}
|
||||
|
||||
@ -179,18 +185,10 @@ std::future<IAsynchronousReader::Result> ThreadPoolReader::submit(Request reques
|
||||
|
||||
if (bytes_read)
|
||||
{
|
||||
/// It reports real time spent including the time spent while thread was preempted doing nothing.
|
||||
/// And it is Ok for the purpose of this watch (it is used to lower the number of threads to read from tables).
|
||||
/// Sometimes it is better to use taskstats::blkio_delay_total, but it is quite expensive to get it
|
||||
/// (TaskStatsInfoGetter has about 500K RPS).
|
||||
watch.stop();
|
||||
|
||||
/// Read successfully from page cache.
|
||||
ProfileEvents::increment(ProfileEvents::ThreadPoolReaderPageCacheHit);
|
||||
ProfileEvents::increment(ProfileEvents::ThreadPoolReaderPageCacheHitBytes, bytes_read);
|
||||
ProfileEvents::increment(ProfileEvents::ReadBufferFromFileDescriptorReadBytes, bytes_read);
|
||||
ProfileEvents::increment(ProfileEvents::ThreadPoolReaderPageCacheHitElapsedMicroseconds, watch.elapsedMicroseconds());
|
||||
ProfileEvents::increment(ProfileEvents::DiskReadElapsedMicroseconds, watch.elapsedMicroseconds());
|
||||
|
||||
promise.set_value({bytes_read, request.ignore});
|
||||
return future;
|
||||
@ -226,6 +224,12 @@ std::future<IAsynchronousReader::Result> ThreadPoolReader::submit(Request reques
|
||||
setThreadName("ThreadPoolRead");
|
||||
|
||||
Stopwatch watch(CLOCK_MONOTONIC);
|
||||
SCOPE_EXIT({
|
||||
watch.stop();
|
||||
|
||||
ProfileEvents::increment(ProfileEvents::ThreadPoolReaderPageCacheMissElapsedMicroseconds, watch.elapsedMicroseconds());
|
||||
ProfileEvents::increment(ProfileEvents::DiskReadElapsedMicroseconds, watch.elapsedMicroseconds());
|
||||
});
|
||||
|
||||
size_t bytes_read = 0;
|
||||
while (!bytes_read)
|
||||
@ -254,8 +258,6 @@ std::future<IAsynchronousReader::Result> ThreadPoolReader::submit(Request reques
|
||||
|
||||
ProfileEvents::increment(ProfileEvents::ThreadPoolReaderPageCacheMissBytes, bytes_read);
|
||||
ProfileEvents::increment(ProfileEvents::ReadBufferFromFileDescriptorReadBytes, bytes_read);
|
||||
ProfileEvents::increment(ProfileEvents::ThreadPoolReaderPageCacheMissElapsedMicroseconds, watch.elapsedMicroseconds());
|
||||
ProfileEvents::increment(ProfileEvents::DiskReadElapsedMicroseconds, watch.elapsedMicroseconds());
|
||||
|
||||
return Result{ .size = bytes_read, .offset = request.ignore };
|
||||
});
|
||||
|
@ -156,6 +156,8 @@ void DiskObjectStorage::getRemotePathsRecursive(const String & local_path, std::
|
||||
e.code() == ErrorCodes::ATTEMPT_TO_READ_AFTER_EOF ||
|
||||
e.code() == ErrorCodes::CANNOT_READ_ALL_DATA)
|
||||
return;
|
||||
|
||||
throw;
|
||||
}
|
||||
catch (const fs::filesystem_error & e)
|
||||
{
|
||||
|
@ -1,4 +1,5 @@
|
||||
#include <Disks/ObjectStorages/S3/S3ObjectStorage.h>
|
||||
#include <Common/ProfileEvents.h>
|
||||
|
||||
#if USE_AWS_S3
|
||||
|
||||
@ -31,6 +32,18 @@
|
||||
#include <Common/logger_useful.h>
|
||||
#include <Common/MultiVersion.h>
|
||||
|
||||
namespace ProfileEvents
|
||||
{
|
||||
extern const Event DeleteS3Objects;
|
||||
extern const Event HeadS3Object;
|
||||
extern const Event ListS3Objects;
|
||||
extern const Event CopyS3Object;
|
||||
extern const Event CreateS3MultipartUpload;
|
||||
extern const Event UploadS3PartCopy;
|
||||
extern const Event AbortS3MultipartUpload;
|
||||
extern const Event CompleteS3MultipartUpload;
|
||||
}
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
@ -96,6 +109,8 @@ std::string S3ObjectStorage::generateBlobNameForPath(const std::string & /* path
|
||||
Aws::S3::Model::HeadObjectOutcome S3ObjectStorage::requestObjectHeadData(const std::string & bucket_from, const std::string & key) const
|
||||
{
|
||||
auto client_ptr = client.get();
|
||||
|
||||
ProfileEvents::increment(ProfileEvents::HeadS3Object);
|
||||
Aws::S3::Model::HeadObjectRequest request;
|
||||
request.SetBucket(bucket_from);
|
||||
request.SetKey(key);
|
||||
@ -211,6 +226,7 @@ void S3ObjectStorage::listPrefix(const std::string & path, RelativePathsWithSize
|
||||
auto settings_ptr = s3_settings.get();
|
||||
auto client_ptr = client.get();
|
||||
|
||||
ProfileEvents::increment(ProfileEvents::ListS3Objects);
|
||||
Aws::S3::Model::ListObjectsV2Request request;
|
||||
request.SetBucket(bucket);
|
||||
request.SetPrefix(path);
|
||||
@ -239,6 +255,7 @@ void S3ObjectStorage::removeObjectImpl(const StoredObject & object, bool if_exis
|
||||
{
|
||||
auto client_ptr = client.get();
|
||||
|
||||
ProfileEvents::increment(ProfileEvents::DeleteS3Objects);
|
||||
Aws::S3::Model::DeleteObjectRequest request;
|
||||
request.SetBucket(bucket);
|
||||
request.SetKey(object.absolute_path);
|
||||
@ -284,6 +301,8 @@ void S3ObjectStorage::removeObjectsImpl(const StoredObjects & objects, bool if_e
|
||||
|
||||
Aws::S3::Model::Delete delkeys;
|
||||
delkeys.SetObjects(current_chunk);
|
||||
|
||||
ProfileEvents::increment(ProfileEvents::DeleteS3Objects);
|
||||
Aws::S3::Model::DeleteObjectsRequest request;
|
||||
request.SetBucket(bucket);
|
||||
request.SetDelete(delkeys);
|
||||
@ -357,6 +376,8 @@ void S3ObjectStorage::copyObjectImpl(
|
||||
std::optional<ObjectAttributes> metadata) const
|
||||
{
|
||||
auto client_ptr = client.get();
|
||||
|
||||
ProfileEvents::increment(ProfileEvents::CopyS3Object);
|
||||
Aws::S3::Model::CopyObjectRequest request;
|
||||
request.SetCopySource(src_bucket + "/" + src_key);
|
||||
request.SetBucket(dst_bucket);
|
||||
@ -405,6 +426,7 @@ void S3ObjectStorage::copyObjectMultipartImpl(
|
||||
String multipart_upload_id;
|
||||
|
||||
{
|
||||
ProfileEvents::increment(ProfileEvents::CreateS3MultipartUpload);
|
||||
Aws::S3::Model::CreateMultipartUploadRequest request;
|
||||
request.SetBucket(dst_bucket);
|
||||
request.SetKey(dst_key);
|
||||
@ -423,6 +445,7 @@ void S3ObjectStorage::copyObjectMultipartImpl(
|
||||
size_t upload_part_size = settings_ptr->s3_settings.min_upload_part_size;
|
||||
for (size_t position = 0, part_number = 1; position < size; ++part_number, position += upload_part_size)
|
||||
{
|
||||
ProfileEvents::increment(ProfileEvents::UploadS3PartCopy);
|
||||
Aws::S3::Model::UploadPartCopyRequest part_request;
|
||||
part_request.SetCopySource(src_bucket + "/" + src_key);
|
||||
part_request.SetBucket(dst_bucket);
|
||||
@ -434,6 +457,7 @@ void S3ObjectStorage::copyObjectMultipartImpl(
|
||||
auto outcome = client_ptr->UploadPartCopy(part_request);
|
||||
if (!outcome.IsSuccess())
|
||||
{
|
||||
ProfileEvents::increment(ProfileEvents::AbortS3MultipartUpload);
|
||||
Aws::S3::Model::AbortMultipartUploadRequest abort_request;
|
||||
abort_request.SetBucket(dst_bucket);
|
||||
abort_request.SetKey(dst_key);
|
||||
@ -448,6 +472,7 @@ void S3ObjectStorage::copyObjectMultipartImpl(
|
||||
}
|
||||
|
||||
{
|
||||
ProfileEvents::increment(ProfileEvents::CompleteS3MultipartUpload);
|
||||
Aws::S3::Model::CompleteMultipartUploadRequest req;
|
||||
req.SetBucket(dst_bucket);
|
||||
req.SetKey(dst_key);
|
||||
|
@ -63,9 +63,10 @@ ColumnsDescription readSchemaFromFormat(
|
||||
{
|
||||
names_and_types = external_schema_reader->readSchema();
|
||||
}
|
||||
catch (const DB::Exception & e)
|
||||
catch (Exception & e)
|
||||
{
|
||||
throw Exception(ErrorCodes::CANNOT_EXTRACT_TABLE_STRUCTURE, "Cannot extract table structure from {} format file. Error: {}. You can specify the structure manually", format_name, e.message());
|
||||
e.addMessage(fmt::format("Cannot extract table structure from {} format file. You can specify the structure manually", format_name));
|
||||
throw;
|
||||
}
|
||||
}
|
||||
else if (FormatFactory::instance().checkIfFormatHasSchemaReader(format_name))
|
||||
@ -85,6 +86,12 @@ ColumnsDescription readSchemaFromFormat(
|
||||
break;
|
||||
is_eof = buf->eof();
|
||||
}
|
||||
catch (Exception & e)
|
||||
{
|
||||
e.addMessage(fmt::format(
|
||||
"Cannot extract table structure from {} format file. You can specify the structure manually", format_name));
|
||||
throw;
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
auto exception_message = getCurrentExceptionMessage(false);
|
||||
@ -136,7 +143,21 @@ ColumnsDescription readSchemaFromFormat(
|
||||
}
|
||||
|
||||
if (!retry || !isRetryableSchemaInferenceError(getCurrentExceptionCode()))
|
||||
throw Exception(ErrorCodes::CANNOT_EXTRACT_TABLE_STRUCTURE, "Cannot extract table structure from {} format file. Error: {}. You can specify the structure manually", format_name, exception_message);
|
||||
{
|
||||
try
|
||||
{
|
||||
throw;
|
||||
}
|
||||
catch (Exception & e)
|
||||
{
|
||||
e.addMessage(fmt::format("Cannot extract table structure from {} format file. You can specify the structure manually", format_name));
|
||||
throw;
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
throw Exception(ErrorCodes::CANNOT_EXTRACT_TABLE_STRUCTURE, "Cannot extract table structure from {} format file. Error: {}. You can specify the structure manually", format_name, exception_message);
|
||||
}
|
||||
}
|
||||
|
||||
exception_messages += "\n" + exception_message;
|
||||
}
|
||||
|
@ -2,6 +2,7 @@
|
||||
#include <Columns/ColumnString.h>
|
||||
#include <Poco/UTF8Encoding.h>
|
||||
#include <Common/UTF8Helpers.h>
|
||||
#include <base/defines.h>
|
||||
|
||||
#ifdef __SSE2__
|
||||
#include <emmintrin.h>
|
||||
@ -89,9 +90,11 @@ struct LowerUpperUTF8Impl
|
||||
ColumnString::Chars & res_data,
|
||||
ColumnString::Offsets & res_offsets)
|
||||
{
|
||||
if (data.empty())
|
||||
return;
|
||||
res_data.resize(data.size());
|
||||
res_offsets.assign(offsets);
|
||||
array(data.data(), data.data() + data.size(), res_data.data());
|
||||
array(data.data(), data.data() + data.size(), offsets, res_data.data());
|
||||
}
|
||||
|
||||
static void vectorFixed(const ColumnString::Chars &, size_t, ColumnString::Chars &)
|
||||
@ -164,8 +167,11 @@ private:
|
||||
static constexpr auto ascii_upper_bound = '\x7f';
|
||||
static constexpr auto flip_case_mask = 'A' ^ 'a';
|
||||
|
||||
static void array(const UInt8 * src, const UInt8 * src_end, UInt8 * dst)
|
||||
static void array(const UInt8 * src, const UInt8 * src_end, const ColumnString::Offsets & offsets, UInt8 * dst)
|
||||
{
|
||||
auto offset_it = offsets.begin();
|
||||
const UInt8 * begin = src;
|
||||
|
||||
#ifdef __SSE2__
|
||||
static constexpr auto bytes_sse = sizeof(__m128i);
|
||||
const auto * src_end_sse = src + (src_end - src) / bytes_sse * bytes_sse;
|
||||
@ -213,10 +219,17 @@ private:
|
||||
else
|
||||
{
|
||||
/// UTF-8
|
||||
const auto * expected_end = src + bytes_sse;
|
||||
|
||||
size_t offset_from_begin = src - begin;
|
||||
while (offset_from_begin >= *offset_it)
|
||||
++offset_it;
|
||||
/// Do not allow one row influence another (since row may have invalid sequence, and break the next)
|
||||
const UInt8 * row_end = begin + *offset_it;
|
||||
chassert(row_end >= src);
|
||||
const UInt8 * expected_end = std::min(src + bytes_sse, row_end);
|
||||
|
||||
while (src < expected_end)
|
||||
toCase(src, src_end, dst);
|
||||
toCase(src, expected_end, dst);
|
||||
|
||||
/// adjust src_end_sse by pushing it forward or backward
|
||||
const auto diff = src - expected_end;
|
||||
@ -229,10 +242,22 @@ private:
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Find which offset src has now
|
||||
while (offset_it != offsets.end() && static_cast<size_t>(src - begin) >= *offset_it)
|
||||
++offset_it;
|
||||
#endif
|
||||
/// handle remaining symbols
|
||||
|
||||
/// handle remaining symbols, row by row (to avoid influence of bad UTF8 symbols from one row, to another)
|
||||
while (src < src_end)
|
||||
toCase(src, src_end, dst);
|
||||
{
|
||||
const UInt8 * row_end = begin + *offset_it;
|
||||
chassert(row_end >= src);
|
||||
|
||||
while (src < row_end)
|
||||
toCase(src, row_end, dst);
|
||||
++offset_it;
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
|
@ -1,6 +1,7 @@
|
||||
#include <mutex>
|
||||
#include <base/bit_cast.h>
|
||||
|
||||
#include <Common/FieldVisitorDump.h>
|
||||
#include <Common/FieldVisitorConvertToNumber.h>
|
||||
#include <DataTypes/DataTypeArray.h>
|
||||
#include <Columns/ColumnString.h>
|
||||
@ -920,8 +921,7 @@ private:
|
||||
ColumnString::Offset current_dst_default_offset = 0;
|
||||
for (size_t i = 0; i < size; ++i)
|
||||
{
|
||||
Field key = src[i];
|
||||
const auto * it = table.find(key.reinterpret<UInt64>());
|
||||
const auto * it = table.find(bit_cast<UInt64>(src[i]));
|
||||
StringRef ref;
|
||||
|
||||
if (it)
|
||||
@ -1081,6 +1081,22 @@ private:
|
||||
|
||||
mutable Cache cache;
|
||||
|
||||
|
||||
static UInt64 bitCastToUInt64(const Field & x)
|
||||
{
|
||||
switch (x.getType())
|
||||
{
|
||||
case Field::Types::UInt64: return x.get<UInt64>();
|
||||
case Field::Types::Int64: return x.get<Int64>();
|
||||
case Field::Types::Float64: return std::bit_cast<UInt64>(x.get<Float64>());
|
||||
case Field::Types::Bool: return x.get<bool>();
|
||||
case Field::Types::Decimal32: return x.get<DecimalField<Decimal32>>().getValue();
|
||||
case Field::Types::Decimal64: return x.get<DecimalField<Decimal64>>().getValue();
|
||||
default:
|
||||
throw Exception("Unexpected type in function 'transform'", ErrorCodes::BAD_ARGUMENTS);
|
||||
}
|
||||
}
|
||||
|
||||
/// Can be called from different threads. It works only on the first call.
|
||||
void initialize(const Array & from, const Array & to, const ColumnsWithTypeAndName & arguments) const
|
||||
{
|
||||
@ -1151,20 +1167,8 @@ private:
|
||||
if (key.isNull())
|
||||
continue;
|
||||
|
||||
// Field may be of Float type, but for the purpose of bitwise
|
||||
// equality we can treat them as UInt64, hence the reinterpret().
|
||||
if (to[0].getType() ==Field::Types::Decimal32)
|
||||
{
|
||||
table[key.reinterpret<Decimal32>()] = (*used_to)[i].reinterpret<Decimal32>();
|
||||
}
|
||||
else if (to[0].getType() ==Field::Types::Decimal64)
|
||||
{
|
||||
table[key.reinterpret<Decimal32>()] = (*used_to)[i].reinterpret<Decimal64>();
|
||||
}
|
||||
else
|
||||
{
|
||||
table[key.reinterpret<UInt64>()] = (*used_to)[i].reinterpret<UInt64>();
|
||||
}
|
||||
/// Field may be of Float type, but for the purpose of bitwise equality we can treat them as UInt64
|
||||
table[bitCastToUInt64(key)] = bitCastToUInt64((*used_to)[i]);
|
||||
}
|
||||
}
|
||||
else
|
||||
@ -1179,7 +1183,7 @@ private:
|
||||
|
||||
const String & str_to = to[i].get<const String &>();
|
||||
StringRef ref{cache.string_pool.insert(str_to.data(), str_to.size() + 1), str_to.size() + 1};
|
||||
table[key.reinterpret<UInt64>()] = ref;
|
||||
table[bitCastToUInt64(key)] = ref;
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -1193,7 +1197,7 @@ private:
|
||||
{
|
||||
const String & str_from = from[i].get<const String &>();
|
||||
StringRef ref{cache.string_pool.insert(str_from.data(), str_from.size() + 1), str_from.size() + 1};
|
||||
table[ref] = (*used_to)[i].reinterpret<UInt64>();
|
||||
table[ref] = bitCastToUInt64((*used_to)[i]);
|
||||
}
|
||||
}
|
||||
else
|
||||
|
@ -24,6 +24,7 @@ namespace ProfileEvents
|
||||
extern const Event ReadBufferFromS3Bytes;
|
||||
extern const Event ReadBufferFromS3RequestsErrors;
|
||||
extern const Event ReadBufferSeekCancelConnection;
|
||||
extern const Event GetS3ObjectRequest;
|
||||
}
|
||||
|
||||
namespace DB
|
||||
@ -275,6 +276,7 @@ SeekableReadBuffer::Range ReadBufferFromS3::getRemainingReadRange() const
|
||||
|
||||
std::unique_ptr<ReadBuffer> ReadBufferFromS3::initialize()
|
||||
{
|
||||
ProfileEvents::increment(ProfileEvents::GetS3ObjectRequest);
|
||||
Aws::S3::Model::GetObjectRequest req;
|
||||
req.SetBucket(bucket);
|
||||
req.SetKey(key);
|
||||
|
@ -1,4 +1,5 @@
|
||||
#include <Common/config.h>
|
||||
#include <Common/ProfileEvents.h>
|
||||
|
||||
#if USE_AWS_S3
|
||||
|
||||
@ -24,6 +25,10 @@
|
||||
namespace ProfileEvents
|
||||
{
|
||||
extern const Event WriteBufferFromS3Bytes;
|
||||
extern const Event S3WriteBytes;
|
||||
extern const Event CompleteS3MultipartUpload;
|
||||
extern const Event UploadS3Part;
|
||||
extern const Event PutS3ObjectRequest;
|
||||
}
|
||||
|
||||
namespace DB
|
||||
@ -303,6 +308,7 @@ void WriteBufferFromS3::fillUploadRequest(Aws::S3::Model::UploadPartRequest & re
|
||||
|
||||
void WriteBufferFromS3::processUploadRequest(UploadPartTask & task)
|
||||
{
|
||||
ProfileEvents::increment(ProfileEvents::UploadS3Part);
|
||||
auto outcome = client_ptr->UploadPart(task.req);
|
||||
|
||||
if (outcome.IsSuccess())
|
||||
@ -326,6 +332,7 @@ void WriteBufferFromS3::completeMultipartUpload()
|
||||
if (tags.empty())
|
||||
throw Exception("Failed to complete multipart upload. No parts have uploaded", ErrorCodes::S3_ERROR);
|
||||
|
||||
ProfileEvents::increment(ProfileEvents::CompleteS3MultipartUpload);
|
||||
Aws::S3::Model::CompleteMultipartUploadRequest req;
|
||||
req.SetBucket(bucket);
|
||||
req.SetKey(key);
|
||||
@ -429,6 +436,7 @@ void WriteBufferFromS3::fillPutRequest(Aws::S3::Model::PutObjectRequest & req)
|
||||
|
||||
void WriteBufferFromS3::processPutRequest(const PutObjectTask & task)
|
||||
{
|
||||
ProfileEvents::increment(ProfileEvents::PutS3ObjectRequest);
|
||||
auto outcome = client_ptr->PutObject(task.req);
|
||||
bool with_pool = static_cast<bool>(schedule);
|
||||
if (outcome.IsSuccess())
|
||||
|
@ -50,19 +50,15 @@ OutputBlockColumns prepareOutputBlockColumns(
|
||||
|
||||
if (aggregate_functions[i]->isState())
|
||||
{
|
||||
/// The ColumnAggregateFunction column captures the shared ownership of the arena with aggregate function states.
|
||||
if (auto * column_aggregate_func = typeid_cast<ColumnAggregateFunction *>(final_aggregate_columns[i].get()))
|
||||
for (auto & pool : aggregates_pools)
|
||||
column_aggregate_func->addArena(pool);
|
||||
|
||||
/// Aggregate state can be wrapped into array if aggregate function ends with -Resample combinator.
|
||||
final_aggregate_columns[i]->forEachSubcolumn(
|
||||
[&aggregates_pools](auto & subcolumn)
|
||||
{
|
||||
if (auto * column_aggregate_func = typeid_cast<ColumnAggregateFunction *>(subcolumn.get()))
|
||||
for (auto & pool : aggregates_pools)
|
||||
column_aggregate_func->addArena(pool);
|
||||
});
|
||||
auto callback = [&](auto & subcolumn)
|
||||
{
|
||||
/// The ColumnAggregateFunction column captures the shared ownership of the arena with aggregate function states.
|
||||
if (auto * column_aggregate_func = typeid_cast<ColumnAggregateFunction *>(subcolumn.get()))
|
||||
for (auto & pool : aggregates_pools)
|
||||
column_aggregate_func->addArena(pool);
|
||||
};
|
||||
callback(final_aggregate_columns[i]);
|
||||
final_aggregate_columns[i]->forEachSubcolumnRecursively(callback);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -1754,8 +1754,11 @@ inline void Aggregator::insertAggregatesIntoColumns(Mapped & mapped, MutableColu
|
||||
* It is also tricky, because there are aggregate functions with "-State" modifier.
|
||||
* When we call "insertResultInto" for them, they insert a pointer to the state to ColumnAggregateFunction
|
||||
* and ColumnAggregateFunction will take ownership of this state.
|
||||
* So, for aggregate functions with "-State" modifier, the state must not be destroyed
|
||||
* after it has been transferred to ColumnAggregateFunction.
|
||||
* So, for aggregate functions with "-State" modifier, only states of all combinators that are used
|
||||
* after -State will be destroyed after result has been transferred to ColumnAggregateFunction.
|
||||
* For example, if we have function `uniqStateForEachMap` after aggregation we should destroy all states that
|
||||
* were created by combinators `-ForEach` and `-Map`, because resulting ColumnAggregateFunction will be
|
||||
* responsible only for destruction of the states created by `uniq` function.
|
||||
* But we should mark that the data no longer owns these states.
|
||||
*/
|
||||
|
||||
@ -1778,8 +1781,8 @@ inline void Aggregator::insertAggregatesIntoColumns(Mapped & mapped, MutableColu
|
||||
|
||||
/** Destroy states that are no longer needed. This loop does not throw.
|
||||
*
|
||||
* Don't destroy states for "-State" aggregate functions,
|
||||
* because the ownership of this state is transferred to ColumnAggregateFunction
|
||||
* For functions with -State combinator we destroy only states of all combinators that are used
|
||||
* after -State, because the ownership of the rest states is transferred to ColumnAggregateFunction
|
||||
* and ColumnAggregateFunction will take care.
|
||||
*
|
||||
* But it's only for states that has been transferred to ColumnAggregateFunction
|
||||
@ -1787,10 +1790,10 @@ inline void Aggregator::insertAggregatesIntoColumns(Mapped & mapped, MutableColu
|
||||
*/
|
||||
for (size_t destroy_i = 0; destroy_i < params.aggregates_size; ++destroy_i)
|
||||
{
|
||||
/// If ownership was not transferred to ColumnAggregateFunction.
|
||||
if (!(destroy_i < insert_i && aggregate_functions[destroy_i]->isState()))
|
||||
aggregate_functions[destroy_i]->destroy(
|
||||
mapped + offsets_of_aggregate_states[destroy_i]);
|
||||
if (destroy_i < insert_i)
|
||||
aggregate_functions[destroy_i]->destroyUpToState(mapped + offsets_of_aggregate_states[destroy_i]);
|
||||
else
|
||||
aggregate_functions[destroy_i]->destroy(mapped + offsets_of_aggregate_states[destroy_i]);
|
||||
}
|
||||
|
||||
/// Mark the cell as destroyed so it will not be destroyed in destructor.
|
||||
@ -1855,12 +1858,7 @@ Block Aggregator::insertResultsIntoColumns(PaddedPODArray<AggregateDataPtr> & pl
|
||||
size_t destroy_index = aggregate_functions_destroy_index;
|
||||
++aggregate_functions_destroy_index;
|
||||
|
||||
/// For State AggregateFunction ownership of aggregate place is passed to result column after insert
|
||||
bool is_state = aggregate_functions[destroy_index]->isState();
|
||||
bool destroy_place_after_insert = !is_state;
|
||||
|
||||
aggregate_functions[destroy_index]->insertResultIntoBatch(
|
||||
0, places.size(), places.data(), offset, *final_aggregate_column, arena, destroy_place_after_insert);
|
||||
aggregate_functions[destroy_index]->insertResultIntoBatch(0, places.size(), places.data(), offset, *final_aggregate_column, arena);
|
||||
}
|
||||
}
|
||||
catch (...)
|
||||
|
@ -115,7 +115,7 @@ std::map<size_t, std::vector<ASTPtr>> moveExpressionToJoinOn(
|
||||
const Aliases & aliases)
|
||||
{
|
||||
std::map<size_t, std::vector<ASTPtr>> asts_to_join_on;
|
||||
for (const auto & node : collectConjunctions(ast))
|
||||
for (const auto & node : splitConjunctionsAst(ast))
|
||||
{
|
||||
if (const auto * func = node->as<ASTFunction>(); func && func->name == NameEquals::name)
|
||||
{
|
||||
|
@ -1988,6 +1988,23 @@ ExpressionAnalysisResult::ExpressionAnalysisResult(
|
||||
}
|
||||
}
|
||||
|
||||
// Here we need to set order by expression as required output to avoid
|
||||
// their removal from the ActionsDAG.
|
||||
const auto * select_query = query_analyzer.getSelectQuery();
|
||||
if (select_query->orderBy())
|
||||
{
|
||||
for (auto & child : select_query->orderBy()->children)
|
||||
{
|
||||
auto * ast = child->as<ASTOrderByElement>();
|
||||
ASTPtr order_expression = ast->children.at(0);
|
||||
if (auto * function = order_expression->as<ASTFunction>();
|
||||
function && (function->is_window_function || function->compute_after_window_functions))
|
||||
continue;
|
||||
const String & column_name = order_expression->getColumnName();
|
||||
chain.getLastStep().addRequiredOutput(column_name);
|
||||
}
|
||||
}
|
||||
|
||||
before_window = chain.getLastActions();
|
||||
finalize_chain(chain);
|
||||
|
||||
@ -2007,7 +2024,6 @@ ExpressionAnalysisResult::ExpressionAnalysisResult(
|
||||
// produced the expressions required to calculate window functions.
|
||||
// They are not needed in the final SELECT result. Knowing the correct
|
||||
// list of columns is important when we apply SELECT DISTINCT later.
|
||||
const auto * select_query = query_analyzer.getSelectQuery();
|
||||
for (const auto & child : select_query->select()->children)
|
||||
{
|
||||
step.addRequiredOutput(child->getColumnName());
|
||||
|
@ -322,22 +322,35 @@ std::optional<size_t> IdentifierMembershipCollector::getIdentsMembership(ASTPtr
|
||||
return IdentifierSemantic::getIdentsMembership(ast, tables, aliases);
|
||||
}
|
||||
|
||||
static void collectConjunctions(const ASTPtr & node, std::vector<ASTPtr> & members)
|
||||
void splitConjunctionsAst(const ASTPtr & node, ASTs & result)
|
||||
{
|
||||
if (const auto * func = node->as<ASTFunction>(); func && func->name == "and")
|
||||
{
|
||||
for (const auto & child : func->arguments->children)
|
||||
collectConjunctions(child, members);
|
||||
if (!node)
|
||||
return;
|
||||
|
||||
result.emplace_back(node);
|
||||
|
||||
for (size_t idx = 0; idx < result.size();)
|
||||
{
|
||||
ASTPtr expression = result.at(idx);
|
||||
|
||||
if (const auto * function = expression->as<ASTFunction>(); function && function->name == "and")
|
||||
{
|
||||
result.erase(result.begin() + idx);
|
||||
|
||||
for (auto & child : function->arguments->children)
|
||||
result.emplace_back(child);
|
||||
|
||||
continue;
|
||||
}
|
||||
++idx;
|
||||
}
|
||||
members.push_back(node);
|
||||
}
|
||||
|
||||
std::vector<ASTPtr> collectConjunctions(const ASTPtr & node)
|
||||
ASTs splitConjunctionsAst(const ASTPtr & node)
|
||||
{
|
||||
std::vector<ASTPtr> members;
|
||||
collectConjunctions(node, members);
|
||||
return members;
|
||||
std::vector<ASTPtr> result;
|
||||
splitConjunctionsAst(node, result);
|
||||
return result;
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -105,6 +105,7 @@ private:
|
||||
};
|
||||
|
||||
/// Split expression `expr_1 AND expr_2 AND ... AND expr_n` into vector `[expr_1, expr_2, ..., expr_n]`
|
||||
std::vector<ASTPtr> collectConjunctions(const ASTPtr & node);
|
||||
ASTs splitConjunctionsAst(const ASTPtr & node);
|
||||
void splitConjunctionsAst(const ASTPtr & node, ASTs & result);
|
||||
|
||||
}
|
||||
|
@ -78,15 +78,11 @@ BlockIO InterpreterAlterQuery::executeToTable(const ASTAlterQuery & alter)
|
||||
query_ptr->as<ASTAlterQuery &>().setDatabase(table_id.database_name);
|
||||
|
||||
DatabasePtr database = DatabaseCatalog::instance().getDatabase(table_id.database_name);
|
||||
if (typeid_cast<DatabaseReplicated *>(database.get())
|
||||
&& !getContext()->getClientInfo().is_replicated_database_internal
|
||||
&& !alter.isAttachAlter()
|
||||
&& !alter.isFetchAlter()
|
||||
&& !alter.isDropPartitionAlter())
|
||||
if (database->shouldReplicateQuery(getContext(), query_ptr))
|
||||
{
|
||||
auto guard = DatabaseCatalog::instance().getDDLGuard(table_id.database_name, table_id.table_name);
|
||||
guard->releaseTableLock();
|
||||
return typeid_cast<DatabaseReplicated *>(database.get())->tryEnqueueReplicatedDDL(query_ptr, getContext());
|
||||
return database->tryEnqueueReplicatedDDL(query_ptr, getContext());
|
||||
}
|
||||
|
||||
StoragePtr table = DatabaseCatalog::instance().getTable(table_id, getContext());
|
||||
|
@ -38,11 +38,11 @@ BlockIO InterpreterCreateIndexQuery::execute()
|
||||
query_ptr->as<ASTCreateIndexQuery &>().setDatabase(table_id.database_name);
|
||||
|
||||
DatabasePtr database = DatabaseCatalog::instance().getDatabase(table_id.database_name);
|
||||
if (typeid_cast<DatabaseReplicated *>(database.get()) && !current_context->getClientInfo().is_replicated_database_internal)
|
||||
if (database->shouldReplicateQuery(getContext(), query_ptr))
|
||||
{
|
||||
auto guard = DatabaseCatalog::instance().getDDLGuard(table_id.database_name, table_id.table_name);
|
||||
guard->releaseTableLock();
|
||||
return assert_cast<DatabaseReplicated *>(database.get())->tryEnqueueReplicatedDDL(query_ptr, current_context);
|
||||
return database->tryEnqueueReplicatedDDL(query_ptr, current_context);
|
||||
}
|
||||
|
||||
StoragePtr table = DatabaseCatalog::instance().getTable(table_id, current_context);
|
||||
|
@ -1001,27 +1001,27 @@ BlockIO InterpreterCreateQuery::createTable(ASTCreateQuery & create)
|
||||
String current_database = getContext()->getCurrentDatabase();
|
||||
auto database_name = create.database ? create.getDatabase() : current_database;
|
||||
|
||||
DDLGuardPtr ddl_guard;
|
||||
|
||||
// If this is a stub ATTACH query, read the query definition from the database
|
||||
if (create.attach && !create.storage && !create.columns_list)
|
||||
{
|
||||
auto database = DatabaseCatalog::instance().getDatabase(database_name);
|
||||
|
||||
if (database->getEngineName() == "Replicated")
|
||||
if (database->shouldReplicateQuery(getContext(), query_ptr))
|
||||
{
|
||||
auto guard = DatabaseCatalog::instance().getDDLGuard(database_name, create.getTable());
|
||||
|
||||
if (auto * ptr = typeid_cast<DatabaseReplicated *>(database.get());
|
||||
ptr && !getContext()->getClientInfo().is_replicated_database_internal)
|
||||
{
|
||||
create.setDatabase(database_name);
|
||||
guard->releaseTableLock();
|
||||
return ptr->tryEnqueueReplicatedDDL(query_ptr, getContext(), internal);
|
||||
}
|
||||
create.setDatabase(database_name);
|
||||
guard->releaseTableLock();
|
||||
return database->tryEnqueueReplicatedDDL(query_ptr, getContext(), internal);
|
||||
}
|
||||
|
||||
if (!create.cluster.empty())
|
||||
return executeQueryOnCluster(create);
|
||||
|
||||
/// For short syntax of ATTACH query we have to lock table name here, before reading metadata
|
||||
/// and hold it until table is attached
|
||||
ddl_guard = DatabaseCatalog::instance().getDDLGuard(database_name, create.getTable());
|
||||
|
||||
bool if_not_exists = create.if_not_exists;
|
||||
|
||||
// Table SQL definition is available even if the table is detached (even permanently)
|
||||
@ -1053,6 +1053,7 @@ BlockIO InterpreterCreateQuery::createTable(ASTCreateQuery & create)
|
||||
|
||||
if (create.attach_from_path)
|
||||
{
|
||||
chassert(!ddl_guard);
|
||||
fs::path user_files = fs::path(getContext()->getUserFilesPath()).lexically_normal();
|
||||
fs::path root_path = fs::path(getContext()->getPath()).lexically_normal();
|
||||
|
||||
@ -1145,27 +1146,30 @@ BlockIO InterpreterCreateQuery::createTable(ASTCreateQuery & create)
|
||||
if (need_add_to_database)
|
||||
database = DatabaseCatalog::instance().getDatabase(database_name);
|
||||
|
||||
if (need_add_to_database && database->getEngineName() == "Replicated")
|
||||
if (need_add_to_database && database->shouldReplicateQuery(getContext(), query_ptr))
|
||||
{
|
||||
chassert(!ddl_guard);
|
||||
auto guard = DatabaseCatalog::instance().getDDLGuard(create.getDatabase(), create.getTable());
|
||||
|
||||
if (auto * ptr = typeid_cast<DatabaseReplicated *>(database.get());
|
||||
ptr && !getContext()->getClientInfo().is_replicated_database_internal)
|
||||
{
|
||||
assertOrSetUUID(create, database);
|
||||
guard->releaseTableLock();
|
||||
return ptr->tryEnqueueReplicatedDDL(query_ptr, getContext(), internal);
|
||||
}
|
||||
assertOrSetUUID(create, database);
|
||||
guard->releaseTableLock();
|
||||
return database->tryEnqueueReplicatedDDL(query_ptr, getContext(), internal);
|
||||
}
|
||||
|
||||
if (!create.cluster.empty())
|
||||
{
|
||||
chassert(!ddl_guard);
|
||||
return executeQueryOnCluster(create);
|
||||
}
|
||||
|
||||
if (create.replace_table)
|
||||
{
|
||||
chassert(!ddl_guard);
|
||||
return doCreateOrReplaceTable(create, properties);
|
||||
}
|
||||
|
||||
/// Actually creates table
|
||||
bool created = doCreateTable(create, properties);
|
||||
bool created = doCreateTable(create, properties, ddl_guard);
|
||||
ddl_guard.reset();
|
||||
|
||||
if (!created) /// Table already exists
|
||||
return {};
|
||||
@ -1180,7 +1184,8 @@ BlockIO InterpreterCreateQuery::createTable(ASTCreateQuery & create)
|
||||
}
|
||||
|
||||
bool InterpreterCreateQuery::doCreateTable(ASTCreateQuery & create,
|
||||
const InterpreterCreateQuery::TableProperties & properties)
|
||||
const InterpreterCreateQuery::TableProperties & properties,
|
||||
DDLGuardPtr & ddl_guard)
|
||||
{
|
||||
if (create.temporary)
|
||||
{
|
||||
@ -1193,16 +1198,12 @@ bool InterpreterCreateQuery::doCreateTable(ASTCreateQuery & create,
|
||||
return true;
|
||||
}
|
||||
|
||||
std::unique_ptr<DDLGuard> guard;
|
||||
if (!ddl_guard)
|
||||
ddl_guard = DatabaseCatalog::instance().getDDLGuard(create.getDatabase(), create.getTable());
|
||||
|
||||
String data_path;
|
||||
DatabasePtr database;
|
||||
|
||||
/** If the request specifies IF NOT EXISTS, we allow concurrent CREATE queries (which do nothing).
|
||||
* If table doesn't exist, one thread is creating table, while others wait in DDLGuard.
|
||||
*/
|
||||
guard = DatabaseCatalog::instance().getDDLGuard(create.getDatabase(), create.getTable());
|
||||
|
||||
database = DatabaseCatalog::instance().getDatabase(create.getDatabase());
|
||||
assertOrSetUUID(create, database);
|
||||
|
||||
@ -1411,7 +1412,9 @@ BlockIO InterpreterCreateQuery::doCreateOrReplaceTable(ASTCreateQuery & create,
|
||||
try
|
||||
{
|
||||
/// Create temporary table (random name will be generated)
|
||||
[[maybe_unused]] bool done = InterpreterCreateQuery(query_ptr, create_context).doCreateTable(create, properties);
|
||||
DDLGuardPtr ddl_guard;
|
||||
[[maybe_unused]] bool done = InterpreterCreateQuery(query_ptr, create_context).doCreateTable(create, properties, ddl_guard);
|
||||
ddl_guard.reset();
|
||||
assert(done);
|
||||
created = true;
|
||||
|
||||
|
@ -18,7 +18,9 @@ class ASTExpressionList;
|
||||
class ASTConstraintDeclaration;
|
||||
class ASTStorage;
|
||||
class IDatabase;
|
||||
class DDLGuard;
|
||||
using DatabasePtr = std::shared_ptr<IDatabase>;
|
||||
using DDLGuardPtr = std::unique_ptr<DDLGuard>;
|
||||
|
||||
|
||||
/** Allows to create new table or database,
|
||||
@ -89,7 +91,7 @@ private:
|
||||
AccessRightsElements getRequiredAccess() const;
|
||||
|
||||
/// Create IStorage and add it to database. If table already exists and IF NOT EXISTS specified, do nothing and return false.
|
||||
bool doCreateTable(ASTCreateQuery & create, const TableProperties & properties);
|
||||
bool doCreateTable(ASTCreateQuery & create, const TableProperties & properties, DDLGuardPtr & ddl_guard);
|
||||
BlockIO doCreateOrReplaceTable(ASTCreateQuery & create, const InterpreterCreateQuery::TableProperties & properties);
|
||||
/// Inserts data in created table if it's CREATE ... SELECT
|
||||
BlockIO fillTableIfNeeded(const ASTCreateQuery & create);
|
||||
|
@ -48,12 +48,11 @@ BlockIO InterpreterDeleteQuery::execute()
|
||||
throw Exception(ErrorCodes::TABLE_IS_READ_ONLY, "Table is read-only");
|
||||
|
||||
DatabasePtr database = DatabaseCatalog::instance().getDatabase(table_id.database_name);
|
||||
if (typeid_cast<DatabaseReplicated *>(database.get())
|
||||
&& !getContext()->getClientInfo().is_replicated_database_internal)
|
||||
if (database->shouldReplicateQuery(getContext(), query_ptr))
|
||||
{
|
||||
auto guard = DatabaseCatalog::instance().getDDLGuard(table_id.database_name, table_id.table_name);
|
||||
guard->releaseTableLock();
|
||||
return typeid_cast<DatabaseReplicated *>(database.get())->tryEnqueueReplicatedDDL(query_ptr, getContext());
|
||||
return database->tryEnqueueReplicatedDDL(query_ptr, getContext());
|
||||
}
|
||||
|
||||
auto table_lock = table->lockForShare(getContext()->getCurrentQueryId(), getContext()->getSettingsRef().lock_acquire_timeout);
|
||||
|
@ -36,11 +36,11 @@ BlockIO InterpreterDropIndexQuery::execute()
|
||||
query_ptr->as<ASTDropIndexQuery &>().setDatabase(table_id.database_name);
|
||||
|
||||
DatabasePtr database = DatabaseCatalog::instance().getDatabase(table_id.database_name);
|
||||
if (typeid_cast<DatabaseReplicated *>(database.get()) && !current_context->getClientInfo().is_replicated_database_internal)
|
||||
if (database->shouldReplicateQuery(getContext(), query_ptr))
|
||||
{
|
||||
auto guard = DatabaseCatalog::instance().getDDLGuard(table_id.database_name, table_id.table_name);
|
||||
guard->releaseTableLock();
|
||||
return assert_cast<DatabaseReplicated *>(database.get())->tryEnqueueReplicatedDDL(query_ptr, current_context);
|
||||
return database->tryEnqueueReplicatedDDL(query_ptr, current_context);
|
||||
}
|
||||
|
||||
StoragePtr table = DatabaseCatalog::instance().getTable(table_id, current_context);
|
||||
|
@ -139,9 +139,6 @@ BlockIO InterpreterDropQuery::executeToTableImpl(ContextPtr context_, ASTDropQue
|
||||
|
||||
/// Prevents recursive drop from drop database query. The original query must specify a table.
|
||||
bool is_drop_or_detach_database = !query_ptr->as<ASTDropQuery>()->table;
|
||||
bool is_replicated_ddl_query = typeid_cast<DatabaseReplicated *>(database.get()) &&
|
||||
!context_->getClientInfo().is_replicated_database_internal &&
|
||||
!is_drop_or_detach_database;
|
||||
|
||||
AccessFlags drop_storage;
|
||||
|
||||
@ -152,7 +149,7 @@ BlockIO InterpreterDropQuery::executeToTableImpl(ContextPtr context_, ASTDropQue
|
||||
else
|
||||
drop_storage = AccessType::DROP_TABLE;
|
||||
|
||||
if (is_replicated_ddl_query)
|
||||
if (database->shouldReplicateQuery(getContext(), query_ptr))
|
||||
{
|
||||
if (query.kind == ASTDropQuery::Kind::Detach)
|
||||
context_->checkAccess(drop_storage, table_id);
|
||||
@ -163,7 +160,7 @@ BlockIO InterpreterDropQuery::executeToTableImpl(ContextPtr context_, ASTDropQue
|
||||
|
||||
ddl_guard->releaseTableLock();
|
||||
table.reset();
|
||||
return typeid_cast<DatabaseReplicated *>(database.get())->tryEnqueueReplicatedDDL(query.clone(), context_);
|
||||
return database->tryEnqueueReplicatedDDL(query.clone(), context_);
|
||||
}
|
||||
|
||||
if (query.kind == ASTDropQuery::Kind::Detach)
|
||||
|
@ -107,7 +107,7 @@ BlockIO InterpreterRenameQuery::executeToTables(const ASTRenameQuery & rename, c
|
||||
}
|
||||
|
||||
DatabasePtr database = database_catalog.getDatabase(elem.from_database_name);
|
||||
if (typeid_cast<DatabaseReplicated *>(database.get()) && !getContext()->getClientInfo().is_replicated_database_internal)
|
||||
if (database->shouldReplicateQuery(getContext(), query_ptr))
|
||||
{
|
||||
if (1 < descriptions.size())
|
||||
throw Exception(
|
||||
@ -120,7 +120,7 @@ BlockIO InterpreterRenameQuery::executeToTables(const ASTRenameQuery & rename, c
|
||||
UniqueTableName to(elem.to_database_name, elem.to_table_name);
|
||||
ddl_guards[from]->releaseTableLock();
|
||||
ddl_guards[to]->releaseTableLock();
|
||||
return typeid_cast<DatabaseReplicated *>(database.get())->tryEnqueueReplicatedDDL(query_ptr, getContext());
|
||||
return database->tryEnqueueReplicatedDDL(query_ptr, getContext());
|
||||
}
|
||||
else
|
||||
{
|
||||
|
@ -4,6 +4,7 @@
|
||||
#include <Interpreters/ExtractExpressionInfoVisitor.h>
|
||||
#include <Interpreters/PredicateRewriteVisitor.h>
|
||||
#include <Interpreters/getTableExpressions.h>
|
||||
#include <Interpreters/IdentifierSemantic.h>
|
||||
#include <Parsers/ASTFunction.h>
|
||||
#include <Parsers/ASTSelectQuery.h>
|
||||
#include <Parsers/ASTSelectWithUnionQuery.h>
|
||||
@ -51,42 +52,14 @@ bool PredicateExpressionsOptimizer::optimize(ASTSelectQuery & select_query)
|
||||
return false;
|
||||
}
|
||||
|
||||
static ASTs splitConjunctionPredicate(const std::initializer_list<const ASTPtr> & predicates)
|
||||
{
|
||||
std::vector<ASTPtr> res;
|
||||
|
||||
for (const auto & predicate : predicates)
|
||||
{
|
||||
if (!predicate)
|
||||
continue;
|
||||
|
||||
res.emplace_back(predicate);
|
||||
|
||||
for (size_t idx = 0; idx < res.size();)
|
||||
{
|
||||
ASTPtr expression = res.at(idx);
|
||||
|
||||
if (const auto * function = expression->as<ASTFunction>(); function && function->name == "and")
|
||||
{
|
||||
res.erase(res.begin() + idx);
|
||||
|
||||
for (auto & child : function->arguments->children)
|
||||
res.emplace_back(child);
|
||||
|
||||
continue;
|
||||
}
|
||||
++idx;
|
||||
}
|
||||
}
|
||||
|
||||
return res;
|
||||
}
|
||||
|
||||
std::vector<ASTs> PredicateExpressionsOptimizer::extractTablesPredicates(const ASTPtr & where, const ASTPtr & prewhere)
|
||||
{
|
||||
std::vector<ASTs> tables_predicates(tables_with_columns.size());
|
||||
|
||||
for (const auto & predicate_expression : splitConjunctionPredicate({where, prewhere}))
|
||||
ASTs predicate_expressions;
|
||||
splitConjunctionsAst(where, predicate_expressions);
|
||||
splitConjunctionsAst(prewhere, predicate_expressions);
|
||||
for (const auto & predicate_expression : predicate_expressions)
|
||||
{
|
||||
ExpressionInfoVisitor::Data expression_info{WithContext{getContext()}, tables_with_columns};
|
||||
ExpressionInfoVisitor(expression_info).visit(predicate_expression);
|
||||
@ -186,7 +159,7 @@ bool PredicateExpressionsOptimizer::tryMovePredicatesFromHavingToWhere(ASTSelect
|
||||
return res;
|
||||
};
|
||||
|
||||
for (const auto & moving_predicate: splitConjunctionPredicate({select_query.having()}))
|
||||
for (const auto & moving_predicate : splitConjunctionsAst(select_query.having()))
|
||||
{
|
||||
TablesWithColumns tables;
|
||||
ExpressionInfoVisitor::Data expression_info{WithContext{getContext()}, tables};
|
||||
|
@ -133,6 +133,36 @@ static void cleanAliasAndCollectIdentifiers(ASTPtr & predicate, std::vector<ASTI
|
||||
identifiers.emplace_back(identifier);
|
||||
}
|
||||
|
||||
|
||||
/// Clean aliases and use aliased name
|
||||
/// Transforms `(a = b as c) AND (x = y)` to `(a = c) AND (x = y)`
|
||||
static void useAliasInsteadOfIdentifier(const ASTPtr & predicate)
|
||||
{
|
||||
if (!predicate->as<ASTSubquery>())
|
||||
{
|
||||
for (auto & children : predicate->children)
|
||||
useAliasInsteadOfIdentifier(children);
|
||||
}
|
||||
|
||||
if (const auto alias = predicate->tryGetAlias(); !alias.empty())
|
||||
{
|
||||
if (ASTIdentifier * identifier = predicate->as<ASTIdentifier>())
|
||||
identifier->setShortName(alias);
|
||||
predicate->setAlias({});
|
||||
}
|
||||
}
|
||||
|
||||
static void getConjunctionHashesFrom(const ASTPtr & ast, std::set<IAST::Hash> & hashes)
|
||||
{
|
||||
for (const auto & pred : splitConjunctionsAst(ast))
|
||||
{
|
||||
/// Clone not to modify `ast`
|
||||
ASTPtr pred_copy = pred->clone();
|
||||
useAliasInsteadOfIdentifier(pred_copy);
|
||||
hashes.emplace(pred_copy->getTreeHash());
|
||||
}
|
||||
}
|
||||
|
||||
bool PredicateRewriteVisitorData::rewriteSubquery(ASTSelectQuery & subquery, const Names & inner_columns)
|
||||
{
|
||||
if ((!optimize_final && subquery.final())
|
||||
@ -143,12 +173,27 @@ bool PredicateRewriteVisitorData::rewriteSubquery(ASTSelectQuery & subquery, con
|
||||
return false;
|
||||
|
||||
Names outer_columns = table_columns.columns.getNames();
|
||||
|
||||
/// Do not add same conditions twice to avoid extra rewrites with exponential blowup
|
||||
/// (e.g. in case of deep complex query with lots of JOINs)
|
||||
std::set<IAST::Hash> hashes;
|
||||
getConjunctionHashesFrom(subquery.where(), hashes);
|
||||
getConjunctionHashesFrom(subquery.having(), hashes);
|
||||
|
||||
bool is_changed = false;
|
||||
for (const auto & predicate : predicates)
|
||||
{
|
||||
std::vector<ASTIdentifier *> identifiers;
|
||||
ASTPtr optimize_predicate = predicate->clone();
|
||||
cleanAliasAndCollectIdentifiers(optimize_predicate, identifiers);
|
||||
|
||||
auto predicate_hash = optimize_predicate->getTreeHash();
|
||||
if (hashes.contains(predicate_hash))
|
||||
continue;
|
||||
|
||||
hashes.emplace(predicate_hash);
|
||||
is_changed = true;
|
||||
|
||||
for (const auto & identifier : identifiers)
|
||||
{
|
||||
IdentifierSemantic::setColumnShortName(*identifier, table_columns.table);
|
||||
@ -169,7 +214,7 @@ bool PredicateRewriteVisitorData::rewriteSubquery(ASTSelectQuery & subquery, con
|
||||
subquery.having() ? makeASTFunction("and", optimize_predicate, subquery.having()) : optimize_predicate);
|
||||
}
|
||||
|
||||
return true;
|
||||
return is_changed;
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -6,26 +6,24 @@
|
||||
#include <DataTypes/DataTypeArray.h>
|
||||
#include <DataTypes/DataTypeTuple.h>
|
||||
#include <DataTypes/DataTypeMap.h>
|
||||
#include <DataTypes/DataTypeObject.h>
|
||||
#include <DataTypes/DataTypesNumber.h>
|
||||
#include <DataTypes/DataTypesDecimal.h>
|
||||
#include <DataTypes/DataTypeString.h>
|
||||
#include <DataTypes/DataTypeFixedString.h>
|
||||
#include <DataTypes/DataTypeDate.h>
|
||||
#include <DataTypes/DataTypeDateTime.h>
|
||||
#include <DataTypes/DataTypeDateTime64.h>
|
||||
#include <DataTypes/DataTypeEnum.h>
|
||||
#include <DataTypes/DataTypeUUID.h>
|
||||
#include <DataTypes/DataTypeLowCardinality.h>
|
||||
#include <DataTypes/DataTypeNullable.h>
|
||||
#include <DataTypes/DataTypeAggregateFunction.h>
|
||||
|
||||
#include <Core/AccurateComparison.h>
|
||||
|
||||
#include <Common/typeid_cast.h>
|
||||
#include <Common/NaNUtils.h>
|
||||
#include <Common/FieldVisitorToString.h>
|
||||
|
||||
#include <Common/FieldVisitorConvertToNumber.h>
|
||||
#include <Common/DateLUT.h>
|
||||
#include <DataTypes/DataTypeAggregateFunction.h>
|
||||
|
||||
|
||||
namespace DB
|
||||
@ -223,7 +221,7 @@ Field convertFieldToTypeImpl(const Field & src, const IDataType & type, const ID
|
||||
&& (which_from_type.isNativeInt() || which_from_type.isNativeUInt() || which_from_type.isDate() || which_from_type.isDate32() || which_from_type.isDateTime() || which_from_type.isDateTime64()))
|
||||
{
|
||||
const auto scale = static_cast<const DataTypeDateTime64 &>(type).getScale();
|
||||
const auto decimal_value = DecimalUtils::decimalFromComponents<DateTime64>(src.reinterpret<Int64>(), 0, scale);
|
||||
const auto decimal_value = DecimalUtils::decimalFromComponents<DateTime64>(applyVisitor(FieldVisitorConvertToNumber<Int64>(), src), 0, scale);
|
||||
return Field(DecimalField<DateTime64>(decimal_value, scale));
|
||||
}
|
||||
}
|
||||
|
@ -562,12 +562,16 @@ bool maybeRemoveOnCluster(const ASTPtr & query_ptr, ContextPtr context)
|
||||
if (database_name != query_on_cluster->cluster)
|
||||
return false;
|
||||
|
||||
auto db = DatabaseCatalog::instance().tryGetDatabase(database_name);
|
||||
if (!db || db->getEngineName() != "Replicated")
|
||||
return false;
|
||||
auto database = DatabaseCatalog::instance().tryGetDatabase(database_name);
|
||||
if (database && database->shouldReplicateQuery(context, query_ptr))
|
||||
{
|
||||
/// It's Replicated database and query is replicated on database level,
|
||||
/// so ON CLUSTER clause is redundant.
|
||||
query_on_cluster->cluster.clear();
|
||||
return true;
|
||||
}
|
||||
|
||||
query_on_cluster->cluster.clear();
|
||||
return true;
|
||||
return false;
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -57,7 +57,7 @@ bool removeJoin(ASTSelectQuery & select, TreeRewriterResult & rewriter_result, C
|
||||
const size_t left_table_pos = 0;
|
||||
/// Test each argument of `and` function and select ones related to only left table
|
||||
std::shared_ptr<ASTFunction> new_conj = makeASTFunction("and");
|
||||
for (auto && node : collectConjunctions(where))
|
||||
for (auto && node : splitConjunctionsAst(where))
|
||||
{
|
||||
if (membership_collector.getIdentsMembership(node) == left_table_pos)
|
||||
new_conj->arguments->children.push_back(std::move(node));
|
||||
|
@ -55,7 +55,7 @@ ORCBlockOutputFormat::ORCBlockOutputFormat(WriteBuffer & out_, const Block & hea
|
||||
data_types.push_back(recursiveRemoveLowCardinality(type));
|
||||
}
|
||||
|
||||
ORC_UNIQUE_PTR<orc::Type> ORCBlockOutputFormat::getORCType(const DataTypePtr & type)
|
||||
std::unique_ptr<orc::Type> ORCBlockOutputFormat::getORCType(const DataTypePtr & type)
|
||||
{
|
||||
switch (type->getTypeId())
|
||||
{
|
||||
@ -398,17 +398,21 @@ void ORCBlockOutputFormat::writeColumn(
|
||||
const auto & list_column = assert_cast<const ColumnArray &>(column);
|
||||
auto nested_type = assert_cast<const DataTypeArray &>(*type).getNestedType();
|
||||
const ColumnArray::Offsets & offsets = list_column.getOffsets();
|
||||
list_orc_column.resize(list_column.size());
|
||||
|
||||
size_t column_size = list_column.size();
|
||||
list_orc_column.resize(column_size);
|
||||
|
||||
/// The length of list i in ListVectorBatch is offsets[i+1] - offsets[i].
|
||||
list_orc_column.offsets[0] = 0;
|
||||
for (size_t i = 0; i != list_column.size(); ++i)
|
||||
for (size_t i = 0; i != column_size; ++i)
|
||||
{
|
||||
list_orc_column.offsets[i + 1] = offsets[i];
|
||||
list_orc_column.notNull[i] = 1;
|
||||
}
|
||||
|
||||
orc::ColumnVectorBatch & nested_orc_column = *list_orc_column.elements;
|
||||
writeColumn(nested_orc_column, list_column.getData(), nested_type, null_bytemap);
|
||||
list_orc_column.numElements = list_column.size();
|
||||
list_orc_column.numElements = column_size;
|
||||
break;
|
||||
}
|
||||
case TypeIndex::Tuple:
|
||||
@ -429,10 +433,12 @@ void ORCBlockOutputFormat::writeColumn(
|
||||
const auto & map_type = assert_cast<const DataTypeMap &>(*type);
|
||||
const ColumnArray::Offsets & offsets = list_column.getOffsets();
|
||||
|
||||
size_t column_size = list_column.size();
|
||||
|
||||
map_orc_column.resize(list_column.size());
|
||||
/// The length of list i in ListVectorBatch is offsets[i+1] - offsets[i].
|
||||
map_orc_column.offsets[0] = 0;
|
||||
for (size_t i = 0; i != list_column.size(); ++i)
|
||||
for (size_t i = 0; i != column_size; ++i)
|
||||
{
|
||||
map_orc_column.offsets[i + 1] = offsets[i];
|
||||
map_orc_column.notNull[i] = 1;
|
||||
@ -447,7 +453,7 @@ void ORCBlockOutputFormat::writeColumn(
|
||||
auto value_type = map_type.getValueType();
|
||||
writeColumn(values_orc_column, *nested_columns[1], value_type, null_bytemap);
|
||||
|
||||
map_orc_column.numElements = list_column.size();
|
||||
map_orc_column.numElements = column_size;
|
||||
break;
|
||||
}
|
||||
default:
|
||||
@ -461,8 +467,9 @@ size_t ORCBlockOutputFormat::getColumnSize(const IColumn & column, DataTypePtr &
|
||||
{
|
||||
auto nested_type = assert_cast<const DataTypeArray &>(*type).getNestedType();
|
||||
const IColumn & nested_column = assert_cast<const ColumnArray &>(column).getData();
|
||||
return getColumnSize(nested_column, nested_type);
|
||||
return std::max(column.size(), getColumnSize(nested_column, nested_type));
|
||||
}
|
||||
|
||||
return column.size();
|
||||
}
|
||||
|
||||
@ -471,9 +478,7 @@ size_t ORCBlockOutputFormat::getMaxColumnSize(Chunk & chunk)
|
||||
size_t columns_num = chunk.getNumColumns();
|
||||
size_t max_column_size = 0;
|
||||
for (size_t i = 0; i != columns_num; ++i)
|
||||
{
|
||||
max_column_size = std::max(max_column_size, getColumnSize(*chunk.getColumns()[i], data_types[i]));
|
||||
}
|
||||
return max_column_size;
|
||||
}
|
||||
|
||||
@ -481,18 +486,23 @@ void ORCBlockOutputFormat::consume(Chunk chunk)
|
||||
{
|
||||
if (!writer)
|
||||
prepareWriter();
|
||||
|
||||
size_t columns_num = chunk.getNumColumns();
|
||||
size_t rows_num = chunk.getNumRows();
|
||||
|
||||
/// getMaxColumnSize is needed to write arrays.
|
||||
/// The size of the batch must be no less than total amount of array elements.
|
||||
ORC_UNIQUE_PTR<orc::ColumnVectorBatch> batch = writer->createRowBatch(getMaxColumnSize(chunk));
|
||||
/// The size of the batch must be no less than total amount of array elements
|
||||
/// and no less than the number of rows (ORC writes a null bit for every row).
|
||||
std::unique_ptr<orc::ColumnVectorBatch> batch = writer->createRowBatch(getMaxColumnSize(chunk));
|
||||
orc::StructVectorBatch & root = dynamic_cast<orc::StructVectorBatch &>(*batch);
|
||||
|
||||
auto columns = chunk.detachColumns();
|
||||
for (auto & column : columns)
|
||||
column = recursiveRemoveLowCardinality(column);
|
||||
|
||||
for (size_t i = 0; i != columns_num; ++i)
|
||||
writeColumn(*root.fields[i], *columns[i], data_types[i], nullptr);
|
||||
|
||||
root.numElements = rows_num;
|
||||
writer->add(*batch);
|
||||
}
|
||||
|
@ -8,11 +8,13 @@
|
||||
#include <Formats/FormatSettings.h>
|
||||
#include <orc/OrcFile.hh>
|
||||
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
class WriteBuffer;
|
||||
|
||||
|
||||
/// orc::Writer writes only in orc::OutputStream
|
||||
class ORCOutputStream : public orc::OutputStream
|
||||
{
|
||||
@ -21,7 +23,7 @@ public:
|
||||
|
||||
uint64_t getLength() const override;
|
||||
uint64_t getNaturalWriteSize() const override;
|
||||
void write(const void* buf, size_t length) override;
|
||||
void write(const void * buf, size_t length) override;
|
||||
|
||||
void close() override {}
|
||||
const std::string& getName() const override { return name; }
|
||||
@ -31,6 +33,7 @@ private:
|
||||
std::string name = "ORCOutputStream";
|
||||
};
|
||||
|
||||
|
||||
class ORCBlockOutputFormat : public IOutputFormat
|
||||
{
|
||||
public:
|
||||
@ -42,7 +45,7 @@ private:
|
||||
void consume(Chunk chunk) override;
|
||||
void finalizeImpl() override;
|
||||
|
||||
ORC_UNIQUE_PTR<orc::Type> getORCType(const DataTypePtr & type);
|
||||
std::unique_ptr<orc::Type> getORCType(const DataTypePtr & type);
|
||||
|
||||
/// ConvertFunc is needed for type UInt8, because firstly UInt8 (char8_t) must be
|
||||
/// converted to unsigned char (bugprone-signed-char-misuse in clang).
|
||||
@ -75,8 +78,8 @@ private:
|
||||
const FormatSettings format_settings;
|
||||
ORCOutputStream output_stream;
|
||||
DataTypes data_types;
|
||||
ORC_UNIQUE_PTR<orc::Writer> writer;
|
||||
ORC_UNIQUE_PTR<orc::Type> schema;
|
||||
std::unique_ptr<orc::Writer> writer;
|
||||
std::unique_ptr<orc::Type> schema;
|
||||
orc::WriterOptions options;
|
||||
};
|
||||
|
||||
|
@ -242,11 +242,21 @@ bool MergeTreeIndexConditionBloomFilter::traverseAtomAST(const ASTPtr & node, Bl
|
||||
DataTypePtr const_type;
|
||||
if (KeyCondition::getConstant(node, block_with_constants, const_value, const_type))
|
||||
{
|
||||
if (const_value.getType() == Field::Types::UInt64 || const_value.getType() == Field::Types::Int64 ||
|
||||
const_value.getType() == Field::Types::Float64)
|
||||
if (const_value.getType() == Field::Types::UInt64)
|
||||
{
|
||||
/// Zero in all types is represented in memory the same way as in UInt64.
|
||||
out.function = const_value.reinterpret<UInt64>() ? RPNElement::ALWAYS_TRUE : RPNElement::ALWAYS_FALSE;
|
||||
out.function = const_value.get<UInt64>() ? RPNElement::ALWAYS_TRUE : RPNElement::ALWAYS_FALSE;
|
||||
return true;
|
||||
}
|
||||
|
||||
if (const_value.getType() == Field::Types::Int64)
|
||||
{
|
||||
out.function = const_value.get<Int64>() ? RPNElement::ALWAYS_TRUE : RPNElement::ALWAYS_FALSE;
|
||||
return true;
|
||||
}
|
||||
|
||||
if (const_value.getType() == Field::Types::Float64)
|
||||
{
|
||||
out.function = const_value.get<Float64>() ? RPNElement::ALWAYS_TRUE : RPNElement::ALWAYS_FALSE;
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
@ -1,4 +1,5 @@
|
||||
#include <Common/config.h>
|
||||
#include <Common/ProfileEvents.h>
|
||||
#include "IO/ParallelReadBuffer.h"
|
||||
#include "IO/IOThreadPool.h"
|
||||
#include "Parsers/ASTCreateQuery.h"
|
||||
@ -63,6 +64,12 @@ namespace fs = std::filesystem;
|
||||
|
||||
static const String PARTITION_ID_WILDCARD = "{_partition_id}";
|
||||
|
||||
namespace ProfileEvents
|
||||
{
|
||||
extern const Event DeleteS3Objects;
|
||||
extern const Event ListS3Objects;
|
||||
}
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
@ -164,6 +171,7 @@ private:
|
||||
{
|
||||
buffer.clear();
|
||||
|
||||
ProfileEvents::increment(ProfileEvents::ListS3Objects);
|
||||
outcome = client.ListObjectsV2(request);
|
||||
if (!outcome.IsSuccess())
|
||||
throw Exception(ErrorCodes::S3_ERROR, "Could not list objects in bucket {} with prefix {}, S3 exception: {}, message: {}",
|
||||
@ -559,6 +567,7 @@ static bool checkIfObjectExists(const std::shared_ptr<const Aws::S3::S3Client> &
|
||||
request.SetPrefix(key);
|
||||
while (!is_finished)
|
||||
{
|
||||
ProfileEvents::increment(ProfileEvents::ListS3Objects);
|
||||
outcome = client->ListObjectsV2(request);
|
||||
if (!outcome.IsSuccess())
|
||||
throw Exception(
|
||||
@ -1036,6 +1045,7 @@ void StorageS3::truncate(const ASTPtr & /* query */, const StorageMetadataPtr &,
|
||||
delkeys.AddObjects(std::move(obj));
|
||||
}
|
||||
|
||||
ProfileEvents::increment(ProfileEvents::DeleteS3Objects);
|
||||
Aws::S3::Model::DeleteObjectsRequest request;
|
||||
request.SetBucket(s3_configuration.uri.bucket);
|
||||
request.SetDelete(delkeys);
|
||||
|
@ -146,6 +146,8 @@ def test_orc_groupby(started_cluster):
|
||||
node = started_cluster.instances["h0_0_0"]
|
||||
result = node.query(
|
||||
"""
|
||||
DROP TABLE IF EXISTS default.demo_orc;
|
||||
CREATE TABLE default.demo_orc (`id` Nullable(String), `score` Nullable(Int32), `day` Nullable(String)) ENGINE = Hive('thrift://hivetest:9083', 'test', 'demo_orc') PARTITION BY(day);
|
||||
SELECT day, count(*) FROM default.demo_orc group by day order by day
|
||||
"""
|
||||
)
|
||||
@ -329,6 +331,8 @@ def test_text_count(started_cluster):
|
||||
node = started_cluster.instances["h0_0_0"]
|
||||
result = node.query(
|
||||
"""
|
||||
DROP TABLE IF EXISTS default.demo_orc;
|
||||
CREATE TABLE default.demo_orc (`id` Nullable(String), `score` Nullable(Int32), `day` Nullable(String)) ENGINE = Hive('thrift://hivetest:9083', 'test', 'demo_orc') PARTITION BY(day);
|
||||
SELECT day, count(*) FROM default.demo_orc group by day order by day SETTINGS format_csv_delimiter = '\x01'
|
||||
"""
|
||||
)
|
||||
|
@ -56,6 +56,17 @@ init_list = {
|
||||
"DiskS3WriteRequestsErrorsTotal": 0,
|
||||
"DiskS3WriteRequestsErrors503": 0,
|
||||
"DiskS3WriteRequestsRedirects": 0,
|
||||
"DeleteS3Objects": 0,
|
||||
"CopyS3Object": 0,
|
||||
"ListS3Objects": 0,
|
||||
"HeadS3Object": 0,
|
||||
"CreateS3MultipartUpload": 0,
|
||||
"UploadS3PartCopy": 0,
|
||||
"UploadS3Part": 0,
|
||||
"AbortS3MultipartUpload": 0,
|
||||
"CompleteS3MultipartUpload": 0,
|
||||
"PutS3ObjectRequest": 0,
|
||||
"GetS3ObjectRequest": 0,
|
||||
}
|
||||
|
||||
|
||||
|
@ -354,6 +354,42 @@ def test_alter_drop_detached_part(started_cluster, engine):
|
||||
dummy_node.query("DROP DATABASE testdb SYNC")
|
||||
|
||||
|
||||
@pytest.mark.parametrize("engine", ["MergeTree", "ReplicatedMergeTree"])
|
||||
def test_alter_drop_partition(started_cluster, engine):
|
||||
main_node.query(
|
||||
"CREATE DATABASE alter_drop_partition ENGINE = Replicated('/clickhouse/databases/test_alter_drop_partition', 'shard1', 'replica1');"
|
||||
)
|
||||
dummy_node.query(
|
||||
"CREATE DATABASE alter_drop_partition ENGINE = Replicated('/clickhouse/databases/test_alter_drop_partition', 'shard1', 'replica2');"
|
||||
)
|
||||
snapshotting_node.query(
|
||||
"CREATE DATABASE alter_drop_partition ENGINE = Replicated('/clickhouse/databases/test_alter_drop_partition', 'shard2', 'replica1');"
|
||||
)
|
||||
|
||||
table = f"alter_drop_partition.alter_drop_{engine}"
|
||||
main_node.query(
|
||||
f"CREATE TABLE {table} (CounterID UInt32) ENGINE = {engine} ORDER BY (CounterID)"
|
||||
)
|
||||
main_node.query(f"INSERT INTO {table} VALUES (123)")
|
||||
if engine == "MergeTree":
|
||||
dummy_node.query(f"INSERT INTO {table} VALUES (456)")
|
||||
snapshotting_node.query(f"INSERT INTO {table} VALUES (789)")
|
||||
main_node.query(
|
||||
f"ALTER TABLE {table} ON CLUSTER alter_drop_partition DROP PARTITION ID 'all'",
|
||||
settings={"replication_alter_partitions_sync": 2},
|
||||
)
|
||||
assert (
|
||||
main_node.query(
|
||||
f"SELECT CounterID FROM clusterAllReplicas('alter_drop_partition', {table})"
|
||||
)
|
||||
== ""
|
||||
)
|
||||
assert dummy_node.query(f"SELECT CounterID FROM {table}") == ""
|
||||
main_node.query("DROP DATABASE alter_drop_partition")
|
||||
dummy_node.query("DROP DATABASE alter_drop_partition")
|
||||
snapshotting_node.query("DROP DATABASE alter_drop_partition")
|
||||
|
||||
|
||||
def test_alter_fetch(started_cluster):
|
||||
main_node.query(
|
||||
"CREATE DATABASE testdb ENGINE = Replicated('/clickhouse/databases/test1', 'shard1', 'replica1');"
|
||||
|
4
tests/performance/lower_upper_utf8.xml
Normal file
4
tests/performance/lower_upper_utf8.xml
Normal file
@ -0,0 +1,4 @@
|
||||
<test>
|
||||
<query>SELECT lowerUTF8(SearchPhrase) FROM hits_100m_single FORMAT Null</query>
|
||||
<query>SELECT upperUTF8(SearchPhrase) FROM hits_100m_single FORMAT Null</query>
|
||||
</test>
|
@ -5,7 +5,7 @@
|
||||
a String,
|
||||
b_count AggregateFunction(uniq, UInt64)
|
||||
) Engine=MergeTree partition by tuple()
|
||||
ORDER by tuple()
|
||||
ORDER by a
|
||||
SETTINGS index_granularity = 1024;
|
||||
</create_query>
|
||||
|
||||
@ -15,7 +15,7 @@
|
||||
a String,
|
||||
b_count AggregateFunction(uniq, String)
|
||||
) Engine=MergeTree partition by tuple()
|
||||
ORDER by tuple()
|
||||
ORDER by a
|
||||
SETTINGS index_granularity = 1024;
|
||||
</create_query>
|
||||
|
||||
@ -28,7 +28,7 @@
|
||||
SELECT a, uniqState(b) b_count
|
||||
FROM
|
||||
(
|
||||
SELECT toString(rand() % 1000) a, toString(number % 10000) b
|
||||
SELECT toString(intDiv(number, 20000)) a, toString(number % 10000) b
|
||||
FROM numbers_mt(20000000)
|
||||
)
|
||||
GROUP BY a
|
||||
|
@ -0,0 +1,3 @@
|
||||
0 1
|
||||
-2 2
|
||||
-2 2
|
5
tests/queries/0_stateless/01655_window_functions_bug.sql
Normal file
5
tests/queries/0_stateless/01655_window_functions_bug.sql
Normal file
@ -0,0 +1,5 @@
|
||||
SELECT round((countIf(rating = 5)) - (countIf(rating < 5)), 4) as nps,
|
||||
dense_rank() OVER (ORDER BY nps DESC) as rank
|
||||
FROM (select number as rating, number%3 rest_id from numbers(10))
|
||||
group by rest_id
|
||||
order by rank;
|
@ -0,0 +1,11 @@
|
||||
-- { echoOn }
|
||||
-- NOTE: total string size should be > 16 (sizeof(__m128i))
|
||||
insert into utf8_overlap values ('\xe2'), ('Foo⚊BarBazBam'), ('\xe2'), ('Foo⚊BarBazBam');
|
||||
-- ^
|
||||
-- MONOGRAM FOR YANG
|
||||
with lowerUTF8(str) as l_, upperUTF8(str) as u_, '0x' || hex(str) as h_
|
||||
select length(str), if(l_ == '\xe2', h_, l_), if(u_ == '\xe2', h_, u_) from utf8_overlap format CSV;
|
||||
1,"0xE2","0xE2"
|
||||
15,"foo⚊barbazbam","FOO⚊BARBAZBAM"
|
||||
1,"0xE2","0xE2"
|
||||
15,"foo⚊barbazbam","FOO⚊BARBAZBAM"
|
@ -0,0 +1,10 @@
|
||||
drop table if exists utf8_overlap;
|
||||
create table utf8_overlap (str String) engine=Memory();
|
||||
|
||||
-- { echoOn }
|
||||
-- NOTE: total string size should be > 16 (sizeof(__m128i))
|
||||
insert into utf8_overlap values ('\xe2'), ('Foo⚊BarBazBam'), ('\xe2'), ('Foo⚊BarBazBam');
|
||||
-- ^
|
||||
-- MONOGRAM FOR YANG
|
||||
with lowerUTF8(str) as l_, upperUTF8(str) as u_, '0x' || hex(str) as h_
|
||||
select length(str), if(l_ == '\xe2', h_, l_), if(u_ == '\xe2', h_, u_) from utf8_overlap format CSV;
|
@ -8,5 +8,4 @@ CUR_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
|
||||
USER_FILES_PATH=$(clickhouse-client --query "select _path,_file from file('nonexist.txt', 'CSV', 'val1 char')" 2>&1 | grep Exception | awk '{gsub("/nonexist.txt","",$9); print $9}')
|
||||
cp $CUR_DIR/data_orc/corrupted.orc $USER_FILES_PATH/
|
||||
|
||||
${CLICKHOUSE_CLIENT} --query="select * from file('corrupted.orc')" 2>&1 | grep -F -q 'CANNOT_EXTRACT_TABLE_STRUCTURE' && echo 'OK' || echo 'FAIL'
|
||||
|
||||
${CLICKHOUSE_CLIENT} --query="select * from file('corrupted.orc')" 2>&1 | grep -F -q 'Cannot extract table structure' && echo 'OK' || echo 'FAIL'
|
||||
|
@ -1,5 +1,5 @@
|
||||
#!/usr/bin/env bash
|
||||
# Tags: no-fasttest
|
||||
# Tags: no-fasttest, no-parallel
|
||||
|
||||
CUR_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
|
||||
# shellcheck source=../shell_config.sh
|
||||
@ -12,7 +12,6 @@ DATA_FILE=$USER_FILES_PATH/$FILE_NAME
|
||||
cp $CUR_DIR/data_parquet_bad_column/metadata_0.parquet $DATA_FILE
|
||||
|
||||
|
||||
$CLICKHOUSE_CLIENT -q "desc file(test_02245.parquet)" 2>&1 | grep -qF "CANNOT_EXTRACT_TABLE_STRUCTURE" && echo "OK" || echo "FAIL"
|
||||
$CLICKHOUSE_CLIENT -q "desc file(test_02245.parquet)" 2>&1 | grep -qF "Cannot extract table structure" && echo "OK" || echo "FAIL"
|
||||
$CLICKHOUSE_CLIENT -q "desc file(test_02245.parquet) settings input_format_parquet_skip_columns_with_unsupported_types_in_schema_inference=1"
|
||||
$CLICKHOUSE_CLIENT -q "select count(*) from file(test_02245.parquet) settings input_format_parquet_skip_columns_with_unsupported_types_in_schema_inference=1"
|
||||
|
||||
|
@ -1,4 +1,5 @@
|
||||
-- Tags: no-fasttest, no-parallel
|
||||
|
||||
insert into function file('02267_data2.jsonl') select NULL as x;
|
||||
insert into function file('02267_data3.jsonl') select * from numbers(0);
|
||||
insert into function file('02267_data4.jsonl') select 1 as x;
|
||||
@ -7,4 +8,4 @@ select * from file('02267_data*.jsonl') order by x;
|
||||
insert into function file('02267_data1.jsonl', 'TSV') select 1 as x;
|
||||
insert into function file('02267_data1.jsonl', 'TSV') select [1,2,3] as x;
|
||||
|
||||
select * from file('02267_data*.jsonl') settings schema_inference_use_cache_for_file=0; --{serverError CANNOT_EXTRACT_TABLE_STRUCTURE}
|
||||
select * from file('02267_data*.jsonl') settings schema_inference_use_cache_for_file=0; --{serverError INCORRECT_DATA}
|
||||
|
@ -1,6 +1,7 @@
|
||||
-- Tags: no-fasttest
|
||||
|
||||
insert into function file('02268_data.jsonl', 'TSV') select 1;
|
||||
select * from file('02268_data.jsonl'); --{serverError CANNOT_EXTRACT_TABLE_STRUCTURE}
|
||||
select * from file('02268_data.jsonl'); --{serverError 117}
|
||||
|
||||
insert into function file('02268_data.jsonCompactEachRow', 'TSV') select 1;
|
||||
select * from file('02268_data.jsonCompactEachRow'); --{serverError CANNOT_EXTRACT_TABLE_STRUCTURE}
|
||||
|
||||
select * from file('02268_data.jsonCompactEachRow'); --{serverError 117}
|
||||
|
@ -23,7 +23,7 @@ $CLICKHOUSE_CLIENT -q "desc file(dump1.sql, MySQLDump) settings input_format_mys
|
||||
$CLICKHOUSE_CLIENT -q "select * from file(dump1.sql, MySQLDump) settings input_format_mysql_dump_table_name='test'"
|
||||
$CLICKHOUSE_CLIENT -q "desc file(dump1.sql, MySQLDump) settings input_format_mysql_dump_table_name='test2'"
|
||||
$CLICKHOUSE_CLIENT -q "select * from file(dump1.sql, MySQLDump) settings input_format_mysql_dump_table_name='test2'"
|
||||
$CLICKHOUSE_CLIENT -q "desc file(dump1.sql, MySQLDump) settings input_format_mysql_dump_table_name='test 3'" 2>&1 | grep -F -q 'CANNOT_EXTRACT_TABLE_STRUCTURE' && echo 'OK' || echo 'FAIL'
|
||||
$CLICKHOUSE_CLIENT -q "desc file(dump1.sql, MySQLDump) settings input_format_mysql_dump_table_name='test 3'" 2>&1 | grep -F -q 'Cannot extract table structure' && echo 'OK' || echo 'FAIL'
|
||||
$CLICKHOUSE_CLIENT -q "select * from file(dump1.sql, MySQLDump, 'x Nullable(Int32)') settings input_format_mysql_dump_table_name='test 3'" 2>&1 | grep -F -q 'EMPTY_DATA_PASSED' && echo 'OK' || echo 'FAIL'
|
||||
|
||||
echo "dump2"
|
||||
@ -146,4 +146,3 @@ $CLICKHOUSE_CLIENT -q "desc file(dump15.sql, MySQLDump) settings input_format_my
|
||||
$CLICKHOUSE_CLIENT -q "select * from file(dump15.sql, MySQLDump) settings input_format_mysql_dump_table_name='test 3'"
|
||||
|
||||
rm $USER_FILES_PATH/dump*.sql
|
||||
|
||||
|
@ -88,6 +88,4 @@ echo '
|
||||
}
|
||||
' > $DATA_FILE
|
||||
|
||||
$CLICKHOUSE_CLIENT -q "desc file(data_02293, JSONColumns) settings input_format_max_rows_to_read_for_schema_inference=3" 2>&1 | grep -F -q 'CANNOT_EXTRACT_TABLE_STRUCTURE' && echo 'OK' || echo 'FAIL'
|
||||
|
||||
|
||||
$CLICKHOUSE_CLIENT -q "desc file(data_02293, JSONColumns) settings input_format_max_rows_to_read_for_schema_inference=3" 2>&1 | grep -F -q 'Cannot extract table structure' && echo 'OK' || echo 'FAIL'
|
||||
|
@ -15,11 +15,11 @@ mkdir -p $SCHEMADIR/$SERVER_SCHEMADIR
|
||||
cp -r $CLIENT_SCHEMADIR/02327_* $SCHEMADIR/$SERVER_SCHEMADIR/
|
||||
|
||||
|
||||
$CLICKHOUSE_CLIENT --query="desc file(data.pb) settings format_schema='$SERVER_SCHEMADIR/02327_schema:MessageWithEmpty'" 2>&1 | grep -F -q 'CANNOT_EXTRACT_TABLE_STRUCTURE' && echo 'OK' || echo 'FAIL';
|
||||
$CLICKHOUSE_CLIENT --query="desc file(data.capnp) settings format_schema='$SERVER_SCHEMADIR/02327_schema:MessageWithEmpty'" 2>&1 | grep -F -q 'CANNOT_EXTRACT_TABLE_STRUCTURE' && echo 'OK' || echo 'FAIL';
|
||||
$CLICKHOUSE_CLIENT --query="desc file(data.pb) settings format_schema='$SERVER_SCHEMADIR/02327_schema:MessageWithEmpty'" 2>&1 | grep -F -q 'Cannot extract table structure' && echo 'OK' || echo 'FAIL';
|
||||
$CLICKHOUSE_CLIENT --query="desc file(data.capnp) settings format_schema='$SERVER_SCHEMADIR/02327_schema:MessageWithEmpty'" 2>&1 | grep -F -q 'Cannot extract table structure' && echo 'OK' || echo 'FAIL';
|
||||
|
||||
$CLICKHOUSE_CLIENT --query="create table test_protobuf engine=File(Protobuf) settings format_schema='$SERVER_SCHEMADIR/02327_schema:MessageWithEmpty'" 2>&1 | grep -F -q 'CANNOT_EXTRACT_TABLE_STRUCTURE' && echo 'OK' || echo 'FAIL';
|
||||
$CLICKHOUSE_CLIENT --query="create table test_capnp engine=File(CapnProto) settings format_schema='$SERVER_SCHEMADIR/02327_schema:MessageWithEmpty'" 2>&1 | grep -F -q 'CANNOT_EXTRACT_TABLE_STRUCTURE' && echo 'OK' || echo 'FAIL';
|
||||
$CLICKHOUSE_CLIENT --query="create table test_protobuf engine=File(Protobuf) settings format_schema='$SERVER_SCHEMADIR/02327_schema:MessageWithEmpty'" 2>&1 | grep -F -q 'Cannot extract table structure' && echo 'OK' || echo 'FAIL';
|
||||
$CLICKHOUSE_CLIENT --query="create table test_capnp engine=File(CapnProto) settings format_schema='$SERVER_SCHEMADIR/02327_schema:MessageWithEmpty'" 2>&1 | grep -F -q 'Cannot extract table structure' && echo 'OK' || echo 'FAIL';
|
||||
|
||||
$CLICKHOUSE_CLIENT --query="desc file(data.pb) settings format_schema='$SERVER_SCHEMADIR/02327_schema:MessageWithEmpty', input_format_protobuf_skip_fields_with_unsupported_types_in_schema_inference=1";
|
||||
$CLICKHOUSE_CLIENT --query="desc file(data.capnp) settings format_schema='$SERVER_SCHEMADIR/02327_schema:MessageWithEmpty', input_format_capn_proto_skip_fields_with_unsupported_types_in_schema_inference=1";
|
||||
|
@ -2,6 +2,18 @@
|
||||
|
||||
-- https://github.com/ClickHouse/ClickHouse/issues/21557
|
||||
|
||||
EXPLAIN SYNTAX
|
||||
WITH
|
||||
x AS ( SELECT number FROM numbers(10) ),
|
||||
cross_sales AS (
|
||||
SELECT 1 AS xx
|
||||
FROM x, x AS d1, x AS d2, x AS d3, x AS d4, x AS d5, x AS d6, x AS d7, x AS d8, x AS d9
|
||||
WHERE x.number = d9.number
|
||||
)
|
||||
SELECT xx FROM cross_sales WHERE xx = 2000 FORMAT Null;
|
||||
|
||||
SET max_analyze_depth = 1;
|
||||
|
||||
EXPLAIN SYNTAX
|
||||
WITH
|
||||
x AS ( SELECT number FROM numbers(10) ),
|
||||
|
@ -0,0 +1,37 @@
|
||||
-- { echoOn }
|
||||
SELECT groupArray(tuple(value)) OVER ()
|
||||
FROM (select number value from numbers(10))
|
||||
ORDER BY value ASC;
|
||||
[(0),(1),(2),(3),(4),(5),(6),(7),(8),(9)]
|
||||
[(0),(1),(2),(3),(4),(5),(6),(7),(8),(9)]
|
||||
[(0),(1),(2),(3),(4),(5),(6),(7),(8),(9)]
|
||||
[(0),(1),(2),(3),(4),(5),(6),(7),(8),(9)]
|
||||
[(0),(1),(2),(3),(4),(5),(6),(7),(8),(9)]
|
||||
[(0),(1),(2),(3),(4),(5),(6),(7),(8),(9)]
|
||||
[(0),(1),(2),(3),(4),(5),(6),(7),(8),(9)]
|
||||
[(0),(1),(2),(3),(4),(5),(6),(7),(8),(9)]
|
||||
[(0),(1),(2),(3),(4),(5),(6),(7),(8),(9)]
|
||||
[(0),(1),(2),(3),(4),(5),(6),(7),(8),(9)]
|
||||
SELECT count() OVER (ORDER BY number + 1) FROM numbers(10) ORDER BY number;
|
||||
1
|
||||
2
|
||||
3
|
||||
4
|
||||
5
|
||||
6
|
||||
7
|
||||
8
|
||||
9
|
||||
10
|
||||
SELECT count() OVER (ORDER BY number + 1) + 1 as foo FROM numbers(10)
|
||||
ORDER BY foo;
|
||||
2
|
||||
3
|
||||
4
|
||||
5
|
||||
6
|
||||
7
|
||||
8
|
||||
9
|
||||
10
|
||||
11
|
@ -0,0 +1,9 @@
|
||||
-- { echoOn }
|
||||
SELECT groupArray(tuple(value)) OVER ()
|
||||
FROM (select number value from numbers(10))
|
||||
ORDER BY value ASC;
|
||||
|
||||
SELECT count() OVER (ORDER BY number + 1) FROM numbers(10) ORDER BY number;
|
||||
|
||||
SELECT count() OVER (ORDER BY number + 1) + 1 as foo FROM numbers(10)
|
||||
ORDER BY foo;
|
@ -0,0 +1,10 @@
|
||||
{1:'\0\n\0\0\0\0uð¨,ËÂ4‡Æ£“Õñô÷çƒeÎØø\tÝÑhäuULÕsE|Ç'}
|
||||
{1:[{1:['\0\n\0\0\0\0uð¨,ËÂ4‡Æ£“Õñô÷çƒeÎØø\tÝÑhäuULÕsE|Ç','\0\n\0\0\0\0uð¨,ËÂ4‡Æ£“Õñô÷çƒeÎØø\tÝÑhäuULÕsE|Ç']}]}
|
||||
[['\0\n\0\0\0\0uð¨,ËÂ4‡Æ£“Õñô÷çƒeÎØø\tÝÑhäuULÕsE|Ç','\0\nuð¨,ËÂ4‡Æ£“Õñô÷çƒeÎØø\tÝÑhä\rÓH%uULÕsE|Ç'],[]]
|
||||
[[{1:'\0\n\0\0\0\0uð¨,ËÂ4‡Æ£“Õñô÷çƒeÎØø\tÝÑhäuULÕsE|Ç'}],[]]
|
||||
['\0\n\0\0\0\0uð¨,ËÂ4‡Æ£“Õñô÷çƒeÎØø\tÝÑhäuULÕsE|Ç']
|
||||
10
|
||||
{1:10}
|
||||
{1:[{1:[10,10]}]}
|
||||
[[10,10],[]]
|
||||
[[{1:10}],[]]
|
Some files were not shown because too many files have changed in this diff Show More
Loading…
Reference in New Issue
Block a user