Merge remote-tracking branch 'origin/master' into pr-local-plan

This commit is contained in:
Igor Nikonov 2024-07-02 20:40:04 +00:00
commit 3b31590477
31 changed files with 762 additions and 274 deletions

View File

@ -235,8 +235,6 @@ namespace Net
/// Note that simply closing a socket is not sufficient /// Note that simply closing a socket is not sufficient
/// to be able to re-use it again. /// to be able to re-use it again.
Poco::Timespan getMaxTimeout();
private: private:
SecureSocketImpl(const SecureSocketImpl &); SecureSocketImpl(const SecureSocketImpl &);
SecureSocketImpl & operator=(const SecureSocketImpl &); SecureSocketImpl & operator=(const SecureSocketImpl &);
@ -250,6 +248,9 @@ namespace Net
Session::Ptr _pSession; Session::Ptr _pSession;
friend class SecureStreamSocketImpl; friend class SecureStreamSocketImpl;
Poco::Timespan getMaxTimeoutOrLimit();
//// Return max(send, receive) if non zero, otherwise maximum timeout
}; };

View File

@ -199,7 +199,7 @@ void SecureSocketImpl::connectSSL(bool performHandshake)
if (performHandshake && _pSocket->getBlocking()) if (performHandshake && _pSocket->getBlocking())
{ {
int ret; int ret;
Poco::Timespan remaining_time = getMaxTimeout(); Poco::Timespan remaining_time = getMaxTimeoutOrLimit();
do do
{ {
RemainingTimeCounter counter(remaining_time); RemainingTimeCounter counter(remaining_time);
@ -302,7 +302,7 @@ int SecureSocketImpl::sendBytes(const void* buffer, int length, int flags)
return rc; return rc;
} }
Poco::Timespan remaining_time = getMaxTimeout(); Poco::Timespan remaining_time = getMaxTimeoutOrLimit();
do do
{ {
RemainingTimeCounter counter(remaining_time); RemainingTimeCounter counter(remaining_time);
@ -338,7 +338,7 @@ int SecureSocketImpl::receiveBytes(void* buffer, int length, int flags)
return rc; return rc;
} }
Poco::Timespan remaining_time = getMaxTimeout(); Poco::Timespan remaining_time = getMaxTimeoutOrLimit();
do do
{ {
/// SSL record may consist of several TCP packets, /// SSL record may consist of several TCP packets,
@ -372,7 +372,7 @@ int SecureSocketImpl::completeHandshake()
poco_check_ptr (_pSSL); poco_check_ptr (_pSSL);
int rc; int rc;
Poco::Timespan remaining_time = getMaxTimeout(); Poco::Timespan remaining_time = getMaxTimeoutOrLimit();
do do
{ {
RemainingTimeCounter counter(remaining_time); RemainingTimeCounter counter(remaining_time);
@ -453,18 +453,29 @@ X509* SecureSocketImpl::peerCertificate() const
return 0; return 0;
} }
Poco::Timespan SecureSocketImpl::getMaxTimeout() Poco::Timespan SecureSocketImpl::getMaxTimeoutOrLimit()
{ {
std::lock_guard<std::recursive_mutex> lock(_mutex); std::lock_guard<std::recursive_mutex> lock(_mutex);
Poco::Timespan remaining_time = _pSocket->getReceiveTimeout(); Poco::Timespan remaining_time = _pSocket->getReceiveTimeout();
Poco::Timespan send_timeout = _pSocket->getSendTimeout(); Poco::Timespan send_timeout = _pSocket->getSendTimeout();
if (remaining_time < send_timeout) if (remaining_time < send_timeout)
remaining_time = send_timeout; remaining_time = send_timeout;
/// zero SO_SNDTIMEO/SO_RCVTIMEO works as no timeout, let's replicate this
///
/// NOTE: we cannot use INT64_MAX (std::numeric_limits<Poco::Timespan::TimeDiff>::max()),
/// since it will be later passed to poll() which accept int timeout, and
/// even though poll() accepts milliseconds and Timespan() accepts
/// microseconds, let's use smaller maximum value just to avoid some possible
/// issues, this should be enough anyway (it is ~24 days).
if (remaining_time == 0)
remaining_time = Poco::Timespan(std::numeric_limits<int>::max());
return remaining_time; return remaining_time;
} }
bool SecureSocketImpl::mustRetry(int rc, Poco::Timespan& remaining_time) bool SecureSocketImpl::mustRetry(int rc, Poco::Timespan& remaining_time)
{ {
if (remaining_time == 0)
return false;
std::lock_guard<std::recursive_mutex> lock(_mutex); std::lock_guard<std::recursive_mutex> lock(_mutex);
if (rc <= 0) if (rc <= 0)
{ {
@ -475,9 +486,7 @@ bool SecureSocketImpl::mustRetry(int rc, Poco::Timespan& remaining_time)
case SSL_ERROR_WANT_READ: case SSL_ERROR_WANT_READ:
if (_pSocket->getBlocking()) if (_pSocket->getBlocking())
{ {
/// Level-triggered mode of epoll_wait is used, so if SSL_read don't read all available data from socket, if (_pSocket->pollImpl(remaining_time, Poco::Net::Socket::SELECT_READ))
/// epoll_wait returns true without waiting for new data even if remaining_time == 0
if (_pSocket->pollImpl(remaining_time, Poco::Net::Socket::SELECT_READ) && remaining_time != 0)
return true; return true;
else else
throw Poco::TimeoutException(); throw Poco::TimeoutException();
@ -486,13 +495,15 @@ bool SecureSocketImpl::mustRetry(int rc, Poco::Timespan& remaining_time)
case SSL_ERROR_WANT_WRITE: case SSL_ERROR_WANT_WRITE:
if (_pSocket->getBlocking()) if (_pSocket->getBlocking())
{ {
/// The same as for SSL_ERROR_WANT_READ if (_pSocket->pollImpl(remaining_time, Poco::Net::Socket::SELECT_WRITE))
if (_pSocket->pollImpl(remaining_time, Poco::Net::Socket::SELECT_WRITE) && remaining_time != 0)
return true; return true;
else else
throw Poco::TimeoutException(); throw Poco::TimeoutException();
} }
break; break;
/// NOTE: POCO_EINTR is the same as SSL_ERROR_WANT_READ (at least in
/// OpenSSL), so this likely dead code, but let's leave it for
/// compatibility with other implementations
case SSL_ERROR_SYSCALL: case SSL_ERROR_SYSCALL:
return socketError == POCO_EAGAIN || socketError == POCO_EINTR; return socketError == POCO_EAGAIN || socketError == POCO_EINTR;
default: default:

View File

@ -173,7 +173,7 @@ See function [substring](string-functions.md#substring).
## bitTest ## bitTest
Takes any integer and converts it into [binary form](https://en.wikipedia.org/wiki/Binary_number), returns the value of a bit at specified position. The countdown starts from 0 from the right to the left. Takes any integer and converts it into [binary form](https://en.wikipedia.org/wiki/Binary_number), returns the value of a bit at specified position. Counting is right-to-left, starting at 0.
**Syntax** **Syntax**
@ -226,7 +226,7 @@ Result:
## bitTestAll ## bitTestAll
Returns result of [logical conjuction](https://en.wikipedia.org/wiki/Logical_conjunction) (AND operator) of all bits at given positions. The countdown starts from 0 from the right to the left. Returns result of [logical conjuction](https://en.wikipedia.org/wiki/Logical_conjunction) (AND operator) of all bits at given positions. Counting is right-to-left, starting at 0.
The conjuction for bit-wise operations: The conjuction for bit-wise operations:
@ -289,7 +289,7 @@ Result:
## bitTestAny ## bitTestAny
Returns result of [logical disjunction](https://en.wikipedia.org/wiki/Logical_disjunction) (OR operator) of all bits at given positions. The countdown starts from 0 from the right to the left. Returns result of [logical disjunction](https://en.wikipedia.org/wiki/Logical_disjunction) (OR operator) of all bits at given positions. Counting is right-to-left, starting at 0.
The disjunction for bit-wise operations: The disjunction for bit-wise operations:

View File

@ -3860,3 +3860,138 @@ Result:
└───────────────┘ └───────────────┘
``` ```
## transactionID
Returns the ID of a [transaction](https://clickhouse.com/docs/en/guides/developer/transactional#transactions-commit-and-rollback).
:::note
This function is part of an experimental feature set. Enable experimental transaction support by adding this setting to your configuration:
```
<clickhouse>
<allow_experimental_transactions>1</allow_experimental_transactions>
</clickhouse>
```
For more information see the page [Transactional (ACID) support](https://clickhouse.com/docs/en/guides/developer/transactional#transactions-commit-and-rollback).
:::
**Syntax**
```sql
transactionID()
```
**Returned value**
- Returns a tuple consisting of `start_csn`, `local_tid` and `host_id`. [Tuple](../data-types/tuple.md).
- `start_csn`: Global sequential number, the newest commit timestamp that was seen when this transaction began. [UInt64](../data-types/int-uint.md).
- `local_tid`: Local sequential number that is unique for each transaction started by this host within a specific start_csn. [UInt64](../data-types/int-uint.md).
- `host_id`: UUID of the host that has started this transaction. [UUID](../data-types/uuid.md).
**Example**
Query:
```sql
BEGIN TRANSACTION;
SELECT transactionID();
ROLLBACK;
```
Result:
```response
┌─transactionID()────────────────────────────────┐
│ (32,34,'0ee8b069-f2bb-4748-9eae-069c85b5252b') │
└────────────────────────────────────────────────┘
```
## transactionLatestSnapshot
Returns the newest snapshot (Commit Sequence Number) of a [transaction](https://clickhouse.com/docs/en/guides/developer/transactional#transactions-commit-and-rollback) that is available for reading.
:::note
This function is part of an experimental feature set. Enable experimental transaction support by adding this setting to your configuration:
```
<clickhouse>
<allow_experimental_transactions>1</allow_experimental_transactions>
</clickhouse>
```
For more information see the page [Transactional (ACID) support](https://clickhouse.com/docs/en/guides/developer/transactional#transactions-commit-and-rollback).
:::
**Syntax**
```sql
transactionLatestSnapshot()
```
**Returned value**
- Returns the latest snapshot (CSN) of a transaction. [UInt64](../data-types/int-uint.md)
**Example**
Query:
```sql
BEGIN TRANSACTION;
SELECT transactionLatestSnapshot();
ROLLBACK;
```
Result:
```response
┌─transactionLatestSnapshot()─┐
│ 32 │
└─────────────────────────────┘
```
## transactionOldestSnapshot
Returns the oldest snapshot (Commit Sequence Number) that is visible for some running [transaction](https://clickhouse.com/docs/en/guides/developer/transactional#transactions-commit-and-rollback).
:::note
This function is part of an experimental feature set. Enable experimental transaction support by adding this setting to your configuration:
```
<clickhouse>
<allow_experimental_transactions>1</allow_experimental_transactions>
</clickhouse>
```
For more information see the page [Transactional (ACID) support](https://clickhouse.com/docs/en/guides/developer/transactional#transactions-commit-and-rollback).
:::
**Syntax**
```sql
transactionOldestSnapshot()
```
**Returned value**
- Returns the oldest snapshot (CSN) of a transaction. [UInt64](../data-types/int-uint.md)
**Example**
Query:
```sql
BEGIN TRANSACTION;
SELECT transactionLatestSnapshot();
ROLLBACK;
```
Result:
```response
┌─transactionOldestSnapshot()─┐
│ 32 │
└─────────────────────────────┘
```

View File

@ -16,6 +16,7 @@ namespace ErrorCodes
{ {
extern const int ILLEGAL_COLUMN; extern const int ILLEGAL_COLUMN;
extern const int ILLEGAL_TYPE_OF_ARGUMENT; extern const int ILLEGAL_TYPE_OF_ARGUMENT;
extern const int PARAMETER_OUT_OF_BOUND;
extern const int TOO_FEW_ARGUMENTS_FOR_FUNCTION; extern const int TOO_FEW_ARGUMENTS_FOR_FUNCTION;
} }
@ -146,6 +147,9 @@ private:
const auto pos = pos_col_const->getUInt(0); const auto pos = pos_col_const->getUInt(0);
if (pos < 8 * sizeof(ValueType)) if (pos < 8 * sizeof(ValueType))
mask = mask | (ValueType(1) << pos); mask = mask | (ValueType(1) << pos);
else
throw Exception(ErrorCodes::PARAMETER_OUT_OF_BOUND,
"The bit position argument {} is out of bounds for number", static_cast<UInt64>(pos));
} }
else else
{ {
@ -186,13 +190,20 @@ private:
for (const auto i : collections::range(0, mask.size())) for (const auto i : collections::range(0, mask.size()))
if (pos[i] < 8 * sizeof(ValueType)) if (pos[i] < 8 * sizeof(ValueType))
mask[i] = mask[i] | (ValueType(1) << pos[i]); mask[i] = mask[i] | (ValueType(1) << pos[i]);
else
throw Exception(ErrorCodes::PARAMETER_OUT_OF_BOUND,
"The bit position argument {} is out of bounds for number", static_cast<UInt64>(pos[i]));
return true; return true;
} }
else if (const auto pos_col_const = checkAndGetColumnConst<ColumnVector<PosType>>(pos_col_untyped)) else if (const auto pos_col_const = checkAndGetColumnConst<ColumnVector<PosType>>(pos_col_untyped))
{ {
const auto & pos = pos_col_const->template getValue<PosType>(); const auto & pos = pos_col_const->template getValue<PosType>();
const auto new_mask = pos < 8 * sizeof(ValueType) ? ValueType(1) << pos : 0; if (pos >= 8 * sizeof(ValueType))
throw Exception(ErrorCodes::PARAMETER_OUT_OF_BOUND,
"The bit position argument {} is out of bounds for number", static_cast<UInt64>(pos));
const auto new_mask = ValueType(1) << pos;
for (const auto i : collections::range(0, mask.size())) for (const auto i : collections::range(0, mask.size()))
mask[i] = mask[i] | new_mask; mask[i] = mask[i] | new_mask;

View File

@ -8,6 +8,7 @@ namespace DB
namespace ErrorCodes namespace ErrorCodes
{ {
extern const int NOT_IMPLEMENTED; extern const int NOT_IMPLEMENTED;
extern const int PARAMETER_OUT_OF_BOUND;
} }
namespace namespace
@ -21,12 +22,21 @@ struct BitTestImpl
static const constexpr bool allow_string_integer = false; static const constexpr bool allow_string_integer = false;
template <typename Result = ResultType> template <typename Result = ResultType>
NO_SANITIZE_UNDEFINED static Result apply(A a [[maybe_unused]], B b [[maybe_unused]]) static Result apply(A a [[maybe_unused]], B b [[maybe_unused]])
{ {
if constexpr (is_big_int_v<A> || is_big_int_v<B>) if constexpr (is_big_int_v<A> || is_big_int_v<B>)
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "bitTest is not implemented for big integers as second argument"); throw Exception(ErrorCodes::NOT_IMPLEMENTED, "bitTest is not implemented for big integers as second argument");
else else
return (typename NumberTraits::ToInteger<A>::Type(a) >> typename NumberTraits::ToInteger<B>::Type(b)) & 1; {
typename NumberTraits::ToInteger<A>::Type a_int = a;
typename NumberTraits::ToInteger<B>::Type b_int = b;
const auto max_position = static_cast<decltype(b)>((8 * sizeof(a)) - 1);
if (b_int > max_position || b_int < 0)
throw Exception(ErrorCodes::PARAMETER_OUT_OF_BOUND,
"The bit position argument needs to a positive value and less or equal to {} for integer {}",
std::to_string(max_position), std::to_string(a_int));
return (a_int >> b_int) & 1;
}
} }
#if USE_EMBEDDED_COMPILER #if USE_EMBEDDED_COMPILER

View File

@ -36,7 +36,6 @@ ColumnsDescription ObjectStorageQueueLogElement::getColumnsDescription()
{"status", status_datatype, "Status of the processing file"}, {"status", status_datatype, "Status of the processing file"},
{"processing_start_time", std::make_shared<DataTypeNullable>(std::make_shared<DataTypeDateTime>()), "Time of the start of processing the file"}, {"processing_start_time", std::make_shared<DataTypeNullable>(std::make_shared<DataTypeDateTime>()), "Time of the start of processing the file"},
{"processing_end_time", std::make_shared<DataTypeNullable>(std::make_shared<DataTypeDateTime>()), "Time of the end of processing the file"}, {"processing_end_time", std::make_shared<DataTypeNullable>(std::make_shared<DataTypeDateTime>()), "Time of the end of processing the file"},
{"ProfileEvents", std::make_shared<DataTypeMap>(std::make_shared<DataTypeString>(), std::make_shared<DataTypeUInt64>()), "Profile events collected while loading this file"},
{"exception", std::make_shared<DataTypeString>(), "Exception message if happened"}, {"exception", std::make_shared<DataTypeString>(), "Exception message if happened"},
}; };
} }
@ -64,8 +63,6 @@ void ObjectStorageQueueLogElement::appendToBlock(MutableColumns & columns) const
else else
columns[i++]->insertDefault(); columns[i++]->insertDefault();
ProfileEvents::dumpToMapColumn(counters_snapshot, columns[i++].get(), true);
columns[i++]->insert(exception); columns[i++]->insert(exception);
} }

View File

@ -26,7 +26,6 @@ struct ObjectStorageQueueLogElement
Failed, Failed,
}; };
ObjectStorageQueueStatus status; ObjectStorageQueueStatus status;
ProfileEvents::Counters::Snapshot counters_snapshot;
time_t processing_start_time; time_t processing_start_time;
time_t processing_end_time; time_t processing_end_time;
std::string exception; std::string exception;

View File

@ -125,7 +125,7 @@ void Chunk::addColumn(size_t position, ColumnPtr column)
if (position >= columns.size()) if (position >= columns.size())
throw Exception(ErrorCodes::POSITION_OUT_OF_BOUND, throw Exception(ErrorCodes::POSITION_OUT_OF_BOUND,
"Position {} out of bound in Chunk::addColumn(), max position = {}", "Position {} out of bound in Chunk::addColumn(), max position = {}",
position, columns.size() - 1); position, !columns.empty() ? columns.size() - 1 : 0);
if (empty()) if (empty())
num_rows = column->size(); num_rows = column->size();
else if (column->size() != num_rows) else if (column->size() != num_rows)
@ -143,7 +143,7 @@ void Chunk::erase(size_t position)
if (position >= columns.size()) if (position >= columns.size())
throw Exception(ErrorCodes::POSITION_OUT_OF_BOUND, "Position {} out of bound in Chunk::erase(), max position = {}", throw Exception(ErrorCodes::POSITION_OUT_OF_BOUND, "Position {} out of bound in Chunk::erase(), max position = {}",
toString(position), toString(columns.size() - 1)); toString(position), toString(!columns.empty() ? columns.size() - 1 : 0));
columns.erase(columns.begin() + position); columns.erase(columns.begin() + position);
} }

View File

@ -4,19 +4,41 @@
#include <set> #include <set>
#if USE_AWS_S3 && USE_PARQUET #if USE_AWS_S3 && USE_PARQUET
#include <parquet/file_reader.h>
#include <Common/logger_useful.h>
#include <Columns/ColumnString.h>
#include <Columns/ColumnNullable.h>
#include <Formats/FormatFactory.h>
#include <IO/ReadBufferFromFileBase.h>
#include <IO/ReadHelpers.h>
#include <Storages/ObjectStorage/DataLakes/Common.h>
#include <Processors/Formats/Impl/ArrowBufferedStreams.h> #include <Processors/Formats/Impl/ArrowBufferedStreams.h>
#include <Processors/Formats/Impl/ParquetBlockInputFormat.h> #include <Processors/Formats/Impl/ParquetBlockInputFormat.h>
#include <Processors/Formats/Impl/ArrowColumnToCHColumn.h> #include <Processors/Formats/Impl/ArrowColumnToCHColumn.h>
#include <Formats/FormatFactory.h>
#include <Columns/ColumnString.h> #include <DataTypes/DataTypeFactory.h>
#include <Columns/ColumnNullable.h> #include <DataTypes/DataTypeArray.h>
#include <Common/logger_useful.h> #include <DataTypes/DataTypeDate.h>
#include <IO/ReadBufferFromFileBase.h> #include <DataTypes/DataTypeDateTime64.h>
#include <IO/ReadHelpers.h> #include <DataTypes/DataTypeFixedString.h>
#include <DataTypes/DataTypeMap.h>
#include <DataTypes/DataTypeNullable.h>
#include <DataTypes/DataTypeString.h>
#include <DataTypes/DataTypeTuple.h>
#include <DataTypes/DataTypeUUID.h>
#include <DataTypes/DataTypesDecimal.h>
#include <DataTypes/DataTypesNumber.h>
#include <boost/algorithm/string/case_conv.hpp> #include <boost/algorithm/string/case_conv.hpp>
#include <parquet/file_reader.h>
#include <parquet/arrow/reader.h> #include <parquet/arrow/reader.h>
#include <Storages/ObjectStorage/DataLakes/Common.h> #include <Poco/JSON/Array.h>
#include <Poco/JSON/Object.h>
#include <Poco/JSON/Parser.h>
namespace fs = std::filesystem;
namespace DB namespace DB
{ {
@ -25,10 +47,14 @@ namespace ErrorCodes
{ {
extern const int INCORRECT_DATA; extern const int INCORRECT_DATA;
extern const int BAD_ARGUMENTS; extern const int BAD_ARGUMENTS;
extern const int LOGICAL_ERROR;
extern const int NOT_IMPLEMENTED;
} }
struct DeltaLakeMetadata::Impl struct DeltaLakeMetadataImpl
{ {
using ConfigurationPtr = DeltaLakeMetadata::ConfigurationPtr;
ObjectStoragePtr object_storage; ObjectStoragePtr object_storage;
ConfigurationPtr configuration; ConfigurationPtr configuration;
ContextPtr context; ContextPtr context;
@ -37,7 +63,7 @@ struct DeltaLakeMetadata::Impl
* Useful links: * Useful links:
* - https://github.com/delta-io/delta/blob/master/PROTOCOL.md#data-files * - https://github.com/delta-io/delta/blob/master/PROTOCOL.md#data-files
*/ */
Impl(ObjectStoragePtr object_storage_, DeltaLakeMetadataImpl(ObjectStoragePtr object_storage_,
ConfigurationPtr configuration_, ConfigurationPtr configuration_,
ContextPtr context_) ContextPtr context_)
: object_storage(object_storage_) : object_storage(object_storage_)
@ -74,9 +100,17 @@ struct DeltaLakeMetadata::Impl
* An action changes one aspect of the table's state, for example, adding or removing a file. * An action changes one aspect of the table's state, for example, adding or removing a file.
* Note: it is not a valid json, but a list of json's, so we read it in a while cycle. * Note: it is not a valid json, but a list of json's, so we read it in a while cycle.
*/ */
std::set<String> processMetadataFiles() struct DeltaLakeMetadata
{
NamesAndTypesList schema;
Strings data_files;
DataLakePartitionColumns partition_columns;
};
DeltaLakeMetadata processMetadataFiles()
{ {
std::set<String> result_files; std::set<String> result_files;
NamesAndTypesList current_schema;
DataLakePartitionColumns current_partition_columns;
const auto checkpoint_version = getCheckpointIfExists(result_files); const auto checkpoint_version = getCheckpointIfExists(result_files);
if (checkpoint_version) if (checkpoint_version)
@ -90,7 +124,7 @@ struct DeltaLakeMetadata::Impl
if (!object_storage->exists(StoredObject(file_path))) if (!object_storage->exists(StoredObject(file_path)))
break; break;
processMetadataFile(file_path, result_files); processMetadataFile(file_path, current_schema, current_partition_columns, result_files);
} }
LOG_TRACE( LOG_TRACE(
@ -101,10 +135,10 @@ struct DeltaLakeMetadata::Impl
{ {
const auto keys = listFiles(*object_storage, *configuration, deltalake_metadata_directory, metadata_file_suffix); const auto keys = listFiles(*object_storage, *configuration, deltalake_metadata_directory, metadata_file_suffix);
for (const String & key : keys) for (const String & key : keys)
processMetadataFile(key, result_files); processMetadataFile(key, current_schema, current_partition_columns, result_files);
} }
return result_files; return DeltaLakeMetadata{current_schema, Strings(result_files.begin(), result_files.end()), current_partition_columns};
} }
/** /**
@ -136,10 +170,20 @@ struct DeltaLakeMetadata::Impl
* \"nullCount\":{\"col-6c990940-59bb-4709-8f2e-17083a82c01a\":0,\"col-763cd7e2-7627-4d8e-9fb7-9e85d0c8845b\":0}}"}} * \"nullCount\":{\"col-6c990940-59bb-4709-8f2e-17083a82c01a\":0,\"col-763cd7e2-7627-4d8e-9fb7-9e85d0c8845b\":0}}"}}
* " * "
*/ */
void processMetadataFile(const String & key, std::set<String> & result) const
/// Read metadata file and fill `file_schema`, `file_parition_columns`, `result`.
/// `result` is a list of data files.
/// `file_schema` is a common schema for all files.
/// Schema evolution is not supported, so we check that all files have the same schema.
/// `file_partiion_columns` is information about partition columns of data files.
void processMetadataFile(
const String & metadata_file_path,
NamesAndTypesList & file_schema,
DataLakePartitionColumns & file_partition_columns,
std::set<String> & result)
{ {
auto read_settings = context->getReadSettings(); auto read_settings = context->getReadSettings();
auto buf = object_storage->readObject(StoredObject(key), read_settings); auto buf = object_storage->readObject(StoredObject(metadata_file_path), read_settings);
char c; char c;
while (!buf->eof()) while (!buf->eof())
@ -157,20 +201,239 @@ struct DeltaLakeMetadata::Impl
if (json_str.empty()) if (json_str.empty())
continue; continue;
const JSON json(json_str); Poco::JSON::Parser parser;
if (json.has("add")) Poco::Dynamic::Var json = parser.parse(json_str);
Poco::JSON::Object::Ptr object = json.extract<Poco::JSON::Object::Ptr>();
// std::ostringstream oss; // STYLE_CHECK_ALLOW_STD_STRING_STREAM
// object->stringify(oss);
// LOG_TEST(log, "Metadata: {}", oss.str());
if (object->has("add"))
{ {
const auto path = json["add"]["path"].getString(); auto add_object = object->get("add").extract<Poco::JSON::Object::Ptr>();
result.insert(std::filesystem::path(configuration->getPath()) / path); auto path = add_object->getValue<String>("path");
result.insert(fs::path(configuration->getPath()) / path);
auto filename = fs::path(path).filename().string();
auto it = file_partition_columns.find(filename);
if (it == file_partition_columns.end())
{
if (add_object->has("partitionValues"))
{
auto partition_values = add_object->get("partitionValues").extract<Poco::JSON::Object::Ptr>();
if (partition_values->size())
{
auto & current_partition_columns = file_partition_columns[filename];
for (const auto & partition_name : partition_values->getNames())
{
const auto value = partition_values->getValue<String>(partition_name);
auto name_and_type = file_schema.tryGetByName(partition_name);
if (!name_and_type)
throw Exception(ErrorCodes::LOGICAL_ERROR, "No such column in schema: {}", partition_name);
auto field = getFieldValue(value, name_and_type->type);
current_partition_columns.emplace_back(*name_and_type, field);
LOG_TEST(log, "Partition {} value is {} (for {})", partition_name, value, filename);
}
}
}
}
} }
else if (json.has("remove")) else if (object->has("remove"))
{ {
const auto path = json["remove"]["path"].getString(); auto path = object->get("remove").extract<Poco::JSON::Object::Ptr>()->getValue<String>("path");
result.erase(std::filesystem::path(configuration->getPath()) / path); result.erase(fs::path(configuration->getPath()) / path);
}
if (object->has("metaData"))
{
const auto metadata_object = object->get("metaData").extract<Poco::JSON::Object::Ptr>();
const auto schema_object = metadata_object->getValue<String>("schemaString");
Poco::JSON::Parser p;
Poco::Dynamic::Var fields_json = parser.parse(schema_object);
Poco::JSON::Object::Ptr fields_object = fields_json.extract<Poco::JSON::Object::Ptr>();
const auto fields = fields_object->get("fields").extract<Poco::JSON::Array::Ptr>();
NamesAndTypesList current_schema;
for (size_t i = 0; i < fields->size(); ++i)
{
const auto field = fields->getObject(static_cast<UInt32>(i));
auto column_name = field->getValue<String>("name");
auto type = field->getValue<String>("type");
auto is_nullable = field->getValue<bool>("nullable");
std::string physical_name;
auto schema_metadata_object = field->get("metadata").extract<Poco::JSON::Object::Ptr>();
if (schema_metadata_object->has("delta.columnMapping.physicalName"))
physical_name = schema_metadata_object->getValue<String>("delta.columnMapping.physicalName");
else
physical_name = column_name;
LOG_TEST(log, "Found column: {}, type: {}, nullable: {}, physical name: {}",
column_name, type, is_nullable, physical_name);
current_schema.push_back({physical_name, getFieldType(field, "type", is_nullable)});
}
if (file_schema.empty())
{
file_schema = current_schema;
}
else if (file_schema != current_schema)
{
throw Exception(ErrorCodes::NOT_IMPLEMENTED,
"Reading from files with different schema is not possible "
"({} is different from {})",
file_schema.toString(), current_schema.toString());
}
} }
} }
} }
DataTypePtr getFieldType(const Poco::JSON::Object::Ptr & field, const String & type_key, bool is_nullable)
{
if (field->isObject(type_key))
return getComplexTypeFromObject(field->getObject(type_key));
auto type = field->get(type_key);
if (type.isString())
{
const String & type_name = type.extract<String>();
auto data_type = getSimpleTypeByName(type_name);
return is_nullable ? makeNullable(data_type) : data_type;
}
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Unexpected 'type' field: {}", type.toString());
}
Field getFieldValue(const String & value, DataTypePtr data_type)
{
DataTypePtr check_type;
if (data_type->isNullable())
check_type = static_cast<const DataTypeNullable *>(data_type.get())->getNestedType();
else
check_type = data_type;
WhichDataType which(check_type->getTypeId());
if (which.isStringOrFixedString())
return value;
else if (which.isInt8())
return parse<Int8>(value);
else if (which.isUInt8())
return parse<UInt8>(value);
else if (which.isInt16())
return parse<Int16>(value);
else if (which.isUInt16())
return parse<UInt16>(value);
else if (which.isInt32())
return parse<Int32>(value);
else if (which.isUInt32())
return parse<UInt32>(value);
else if (which.isInt64())
return parse<Int64>(value);
else if (which.isUInt64())
return parse<UInt64>(value);
else if (which.isFloat32())
return parse<Float32>(value);
else if (which.isFloat64())
return parse<Float64>(value);
else if (which.isDate())
return UInt16{LocalDate{std::string(value)}.getDayNum()};
else if (which.isDate32())
return Int32{LocalDate{std::string(value)}.getExtenedDayNum()};
else if (which.isDateTime64())
{
ReadBufferFromString in(value);
DateTime64 time = 0;
readDateTime64Text(time, 6, in, assert_cast<const DataTypeDateTime64 *>(data_type.get())->getTimeZone());
return time;
}
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Unsupported DeltaLake type for {}", check_type->getColumnType());
}
DataTypePtr getSimpleTypeByName(const String & type_name)
{
/// https://github.com/delta-io/delta/blob/master/PROTOCOL.md#primitive-types
if (type_name == "string" || type_name == "binary")
return std::make_shared<DataTypeString>();
if (type_name == "long")
return std::make_shared<DataTypeInt64>();
if (type_name == "integer")
return std::make_shared<DataTypeInt32>();
if (type_name == "short")
return std::make_shared<DataTypeInt16>();
if (type_name == "byte")
return std::make_shared<DataTypeInt8>();
if (type_name == "float")
return std::make_shared<DataTypeFloat32>();
if (type_name == "double")
return std::make_shared<DataTypeFloat64>();
if (type_name == "boolean")
return DataTypeFactory::instance().get("Bool");
if (type_name == "date")
return std::make_shared<DataTypeDate32>();
if (type_name == "timestamp")
return std::make_shared<DataTypeDateTime64>(6);
if (type_name.starts_with("decimal(") && type_name.ends_with(')'))
{
ReadBufferFromString buf(std::string_view(type_name.begin() + 8, type_name.end() - 1));
size_t precision;
size_t scale;
readIntText(precision, buf);
skipWhitespaceIfAny(buf);
assertChar(',', buf);
skipWhitespaceIfAny(buf);
tryReadIntText(scale, buf);
return createDecimal<DataTypeDecimal>(precision, scale);
}
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Unsupported DeltaLake type: {}", type_name);
}
DataTypePtr getComplexTypeFromObject(const Poco::JSON::Object::Ptr & type)
{
String type_name = type->getValue<String>("type");
if (type_name == "struct")
{
DataTypes element_types;
Names element_names;
auto fields = type->get("fields").extract<Poco::JSON::Array::Ptr>();
element_types.reserve(fields->size());
element_names.reserve(fields->size());
for (size_t i = 0; i != fields->size(); ++i)
{
auto field = fields->getObject(static_cast<Int32>(i));
element_names.push_back(field->getValue<String>("name"));
auto required = field->getValue<bool>("required");
element_types.push_back(getFieldType(field, "type", required));
}
return std::make_shared<DataTypeTuple>(element_types, element_names);
}
if (type_name == "array")
{
bool is_nullable = type->getValue<bool>("containsNull");
auto element_type = getFieldType(type, "elementType", is_nullable);
return std::make_shared<DataTypeArray>(element_type);
}
if (type_name == "map")
{
bool is_nullable = type->getValue<bool>("containsNull");
auto key_type = getFieldType(type, "keyType", /* is_nullable */false);
auto value_type = getFieldType(type, "valueType", is_nullable);
return std::make_shared<DataTypeMap>(key_type, value_type);
}
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Unsupported DeltaLake type: {}", type_name);
}
/** /**
* Checkpoints in delta-lake are created each 10 commits by default. * Checkpoints in delta-lake are created each 10 commits by default.
* Latest checkpoint is written in _last_checkpoint file: _delta_log/_last_checkpoint * Latest checkpoint is written in _last_checkpoint file: _delta_log/_last_checkpoint
@ -277,8 +540,8 @@ struct DeltaLakeMetadata::Impl
ArrowMemoryPool::instance(), ArrowMemoryPool::instance(),
&reader)); &reader));
std::shared_ptr<arrow::Schema> schema; std::shared_ptr<arrow::Schema> file_schema;
THROW_ARROW_NOT_OK(reader->GetSchema(&schema)); THROW_ARROW_NOT_OK(reader->GetSchema(&file_schema));
ArrowColumnToCHColumn column_reader( ArrowColumnToCHColumn column_reader(
header, "Parquet", header, "Parquet",
@ -325,18 +588,15 @@ DeltaLakeMetadata::DeltaLakeMetadata(
ObjectStoragePtr object_storage_, ObjectStoragePtr object_storage_,
ConfigurationPtr configuration_, ConfigurationPtr configuration_,
ContextPtr context_) ContextPtr context_)
: impl(std::make_unique<Impl>(object_storage_, configuration_, context_))
{ {
} auto impl = DeltaLakeMetadataImpl(object_storage_, configuration_, context_);
auto result = impl.processMetadataFiles();
data_files = result.data_files;
schema = result.schema;
partition_columns = result.partition_columns;
Strings DeltaLakeMetadata::getDataFiles() const LOG_TRACE(impl.log, "Found {} data files, {} partition files, schema: {}",
{ data_files.size(), partition_columns.size(), schema.toString());
if (!data_files.empty())
return data_files;
auto result = impl->processMetadataFiles();
data_files = Strings(result.begin(), result.end());
return data_files;
} }
} }

View File

@ -20,9 +20,13 @@ public:
ConfigurationPtr configuration_, ConfigurationPtr configuration_,
ContextPtr context_); ContextPtr context_);
Strings getDataFiles() const override; Strings getDataFiles() const override { return data_files; }
NamesAndTypesList getTableSchema() const override { return {}; } NamesAndTypesList getTableSchema() const override { return schema; }
const DataLakePartitionColumns & getPartitionColumns() const override { return partition_columns; }
const std::unordered_map<String, String> & getColumnNameToPhysicalNameMapping() const override { return column_name_to_physical_name; }
bool operator ==(const IDataLakeMetadata & other) const override bool operator ==(const IDataLakeMetadata & other) const override
{ {
@ -41,9 +45,10 @@ public:
} }
private: private:
struct Impl;
const std::shared_ptr<Impl> impl;
mutable Strings data_files; mutable Strings data_files;
NamesAndTypesList schema;
std::unordered_map<String, String> column_name_to_physical_name;
DataLakePartitionColumns partition_columns;
}; };
} }

View File

@ -26,6 +26,10 @@ public:
NamesAndTypesList getTableSchema() const override { return {}; } NamesAndTypesList getTableSchema() const override { return {}; }
const DataLakePartitionColumns & getPartitionColumns() const override { return partition_columns; }
const std::unordered_map<String, String> & getColumnNameToPhysicalNameMapping() const override { return column_name_to_physical_name; }
bool operator ==(const IDataLakeMetadata & other) const override bool operator ==(const IDataLakeMetadata & other) const override
{ {
const auto * hudi_metadata = dynamic_cast<const HudiMetadata *>(&other); const auto * hudi_metadata = dynamic_cast<const HudiMetadata *>(&other);
@ -46,6 +50,8 @@ private:
const ObjectStoragePtr object_storage; const ObjectStoragePtr object_storage;
const ConfigurationPtr configuration; const ConfigurationPtr configuration;
mutable Strings data_files; mutable Strings data_files;
std::unordered_map<String, String> column_name_to_physical_name;
DataLakePartitionColumns partition_columns;
Strings getDataFilesImpl() const; Strings getDataFilesImpl() const;
}; };

View File

@ -2,6 +2,7 @@
#include <boost/noncopyable.hpp> #include <boost/noncopyable.hpp>
#include <Core/Types.h> #include <Core/Types.h>
#include <Core/NamesAndTypes.h> #include <Core/NamesAndTypes.h>
#include "PartitionColumns.h"
namespace DB namespace DB
{ {
@ -13,6 +14,8 @@ public:
virtual Strings getDataFiles() const = 0; virtual Strings getDataFiles() const = 0;
virtual NamesAndTypesList getTableSchema() const = 0; virtual NamesAndTypesList getTableSchema() const = 0;
virtual bool operator==(const IDataLakeMetadata & other) const = 0; virtual bool operator==(const IDataLakeMetadata & other) const = 0;
virtual const DataLakePartitionColumns & getPartitionColumns() const = 0;
virtual const std::unordered_map<String, String> & getColumnNameToPhysicalNameMapping() const = 0;
}; };
using DataLakeMetadataPtr = std::unique_ptr<IDataLakeMetadata>; using DataLakeMetadataPtr = std::unique_ptr<IDataLakeMetadata>;

View File

@ -81,7 +81,7 @@ public:
auto metadata = DataLakeMetadata::create(object_storage_, base_configuration, local_context); auto metadata = DataLakeMetadata::create(object_storage_, base_configuration, local_context);
auto schema_from_metadata = metadata->getTableSchema(); auto schema_from_metadata = metadata->getTableSchema();
if (schema_from_metadata != NamesAndTypesList{}) if (!schema_from_metadata.empty())
{ {
return ColumnsDescription(std::move(schema_from_metadata)); return ColumnsDescription(std::move(schema_from_metadata));
} }
@ -99,13 +99,13 @@ public:
Storage::updateConfiguration(local_context); Storage::updateConfiguration(local_context);
auto new_metadata = DataLakeMetadata::create(Storage::object_storage, base_configuration, local_context); auto new_metadata = DataLakeMetadata::create(Storage::object_storage, base_configuration, local_context);
if (current_metadata && *current_metadata == *new_metadata) if (current_metadata && *current_metadata == *new_metadata)
return; return;
current_metadata = std::move(new_metadata); current_metadata = std::move(new_metadata);
auto updated_configuration = base_configuration->clone(); auto updated_configuration = base_configuration->clone();
updated_configuration->setPaths(current_metadata->getDataFiles()); updated_configuration->setPaths(current_metadata->getDataFiles());
updated_configuration->setPartitionColumns(current_metadata->getPartitionColumns());
Storage::configuration = updated_configuration; Storage::configuration = updated_configuration;
} }
@ -123,11 +123,42 @@ public:
{ {
base_configuration->format = Storage::configuration->format; base_configuration->format = Storage::configuration->format;
} }
if (current_metadata)
{
const auto & columns = current_metadata->getPartitionColumns();
base_configuration->setPartitionColumns(columns);
Storage::configuration->setPartitionColumns(columns);
}
} }
private: private:
ConfigurationPtr base_configuration; ConfigurationPtr base_configuration;
DataLakeMetadataPtr current_metadata; DataLakeMetadataPtr current_metadata;
ReadFromFormatInfo prepareReadingFromFormat(
const Strings & requested_columns,
const StorageSnapshotPtr & storage_snapshot,
bool supports_subset_of_columns,
ContextPtr local_context) override
{
auto info = DB::prepareReadingFromFormat(requested_columns, storage_snapshot, supports_subset_of_columns);
if (!current_metadata)
{
Storage::updateConfiguration(local_context);
current_metadata = DataLakeMetadata::create(Storage::object_storage, base_configuration, local_context);
}
auto column_mapping = current_metadata->getColumnNameToPhysicalNameMapping();
if (!column_mapping.empty())
{
for (const auto & [column_name, physical_name] : column_mapping)
{
auto & column = info.format_header.getByName(column_name);
column.name = physical_name;
}
}
return info;
}
}; };
using StorageIceberg = IStorageDataLake<IcebergMetadata>; using StorageIceberg = IStorageDataLake<IcebergMetadata>;

View File

@ -82,6 +82,10 @@ public:
/// Get table schema parsed from metadata. /// Get table schema parsed from metadata.
NamesAndTypesList getTableSchema() const override { return schema; } NamesAndTypesList getTableSchema() const override { return schema; }
const std::unordered_map<String, String> & getColumnNameToPhysicalNameMapping() const override { return column_name_to_physical_name; }
const DataLakePartitionColumns & getPartitionColumns() const override { return partition_columns; }
bool operator ==(const IDataLakeMetadata & other) const override bool operator ==(const IDataLakeMetadata & other) const override
{ {
const auto * iceberg_metadata = dynamic_cast<const IcebergMetadata *>(&other); const auto * iceberg_metadata = dynamic_cast<const IcebergMetadata *>(&other);
@ -104,6 +108,8 @@ private:
Int32 current_schema_id; Int32 current_schema_id;
NamesAndTypesList schema; NamesAndTypesList schema;
mutable Strings data_files; mutable Strings data_files;
std::unordered_map<String, String> column_name_to_physical_name;
DataLakePartitionColumns partition_columns;
LoggerPtr log; LoggerPtr log;
}; };

View File

@ -0,0 +1,19 @@
#pragma once
#include <Core/NamesAndTypes.h>
#include <Core/Field.h>
namespace DB
{
struct DataLakePartitionColumn
{
NameAndTypePair name_and_type;
Field value;
bool operator ==(const DataLakePartitionColumn & other) const = default;
};
/// Data file -> partition columns
using DataLakePartitionColumns = std::unordered_map<std::string, std::vector<DataLakePartitionColumn>>;
}

View File

@ -203,6 +203,15 @@ private:
}; };
} }
ReadFromFormatInfo StorageObjectStorage::prepareReadingFromFormat(
const Strings & requested_columns,
const StorageSnapshotPtr & storage_snapshot,
bool supports_subset_of_columns,
ContextPtr /* local_context */)
{
return DB::prepareReadingFromFormat(requested_columns, storage_snapshot, supports_subset_of_columns);
}
void StorageObjectStorage::read( void StorageObjectStorage::read(
QueryPlan & query_plan, QueryPlan & query_plan,
const Names & column_names, const Names & column_names,
@ -222,7 +231,7 @@ void StorageObjectStorage::read(
} }
const auto read_from_format_info = prepareReadingFromFormat( const auto read_from_format_info = prepareReadingFromFormat(
column_names, storage_snapshot, supportsSubsetOfColumns(local_context)); column_names, storage_snapshot, supportsSubsetOfColumns(local_context), local_context);
const bool need_only_count = (query_info.optimize_trivial_count || read_from_format_info.requested_columns.empty()) const bool need_only_count = (query_info.optimize_trivial_count || read_from_format_info.requested_columns.empty())
&& local_context->getSettingsRef().optimize_count_from_files; && local_context->getSettingsRef().optimize_count_from_files;
@ -451,6 +460,7 @@ StorageObjectStorage::Configuration::Configuration(const Configuration & other)
format = other.format; format = other.format;
compression_method = other.compression_method; compression_method = other.compression_method;
structure = other.structure; structure = other.structure;
partition_columns = other.partition_columns;
} }
bool StorageObjectStorage::Configuration::withPartitionWildcard() const bool StorageObjectStorage::Configuration::withPartitionWildcard() const

View File

@ -5,6 +5,7 @@
#include <Parsers/IAST_fwd.h> #include <Parsers/IAST_fwd.h>
#include <Storages/prepareReadingFromFormat.h> #include <Storages/prepareReadingFromFormat.h>
#include <Processors/Formats/IInputFormat.h> #include <Processors/Formats/IInputFormat.h>
#include <Storages/ObjectStorage/DataLakes/PartitionColumns.h>
namespace DB namespace DB
{ {
@ -117,6 +118,12 @@ public:
protected: protected:
virtual void updateConfiguration(ContextPtr local_context); virtual void updateConfiguration(ContextPtr local_context);
virtual ReadFromFormatInfo prepareReadingFromFormat(
const Strings & requested_columns,
const StorageSnapshotPtr & storage_snapshot,
bool supports_subset_of_columns,
ContextPtr local_context);
static std::unique_ptr<ReadBufferIterator> createReadBufferIterator( static std::unique_ptr<ReadBufferIterator> createReadBufferIterator(
const ObjectStoragePtr & object_storage, const ObjectStoragePtr & object_storage,
const ConfigurationPtr & configuration, const ConfigurationPtr & configuration,
@ -188,6 +195,9 @@ public:
virtual ConfigurationPtr clone() = 0; virtual ConfigurationPtr clone() = 0;
virtual bool isStaticConfiguration() const { return true; } virtual bool isStaticConfiguration() const { return true; }
void setPartitionColumns(const DataLakePartitionColumns & columns) { partition_columns = columns; }
const DataLakePartitionColumns & getPartitionColumns() const { return partition_columns; }
String format = "auto"; String format = "auto";
String compression_method = "auto"; String compression_method = "auto";
String structure = "auto"; String structure = "auto";
@ -199,6 +209,7 @@ protected:
void assertInitialized() const; void assertInitialized() const;
bool initialized = false; bool initialized = false;
DataLakePartitionColumns partition_columns;
}; };
} }

View File

@ -203,6 +203,31 @@ Chunk StorageObjectStorageSource::generate()
.filename = &filename, .filename = &filename,
.last_modified = object_info->metadata->last_modified .last_modified = object_info->metadata->last_modified
}); });
const auto & partition_columns = configuration->getPartitionColumns();
if (!partition_columns.empty() && chunk_size && chunk.hasColumns())
{
auto partition_values = partition_columns.find(filename);
for (const auto & [name_and_type, value] : partition_values->second)
{
if (!read_from_format_info.source_header.has(name_and_type.name))
continue;
const auto column_pos = read_from_format_info.source_header.getPositionByName(name_and_type.name);
auto partition_column = name_and_type.type->createColumnConst(chunk.getNumRows(), value)->convertToFullColumnIfConst();
/// This column is filled with default value now, remove it.
chunk.erase(column_pos);
/// Add correct values.
if (chunk.hasColumns())
chunk.addColumn(column_pos, std::move(partition_column));
else
chunk.addColumn(std::move(partition_column));
}
}
return chunk; return chunk;
} }

View File

@ -6,6 +6,7 @@
#include <Processors/Executors/PullingPipelineExecutor.h> #include <Processors/Executors/PullingPipelineExecutor.h>
#include <Processors/Formats/IInputFormat.h> #include <Processors/Formats/IInputFormat.h>
#include <Storages/ObjectStorage/StorageObjectStorage.h> #include <Storages/ObjectStorage/StorageObjectStorage.h>
#include <Storages/ObjectStorage/DataLakes/PartitionColumns.h>
namespace DB namespace DB

View File

@ -34,7 +34,6 @@ public:
std::atomic<time_t> processing_start_time = 0; std::atomic<time_t> processing_start_time = 0;
std::atomic<time_t> processing_end_time = 0; std::atomic<time_t> processing_end_time = 0;
std::atomic<size_t> retries = 0; std::atomic<size_t> retries = 0;
ProfileEvents::Counters profile_counters;
private: private:
mutable std::mutex last_exception_mutex; mutable std::mutex last_exception_mutex;

View File

@ -509,10 +509,6 @@ Chunk ObjectStorageQueueSource::generateImpl()
path, processed_rows_from_file); path, processed_rows_from_file);
} }
auto * prev_scope = CurrentThread::get().attachProfileCountersScope(&file_status->profile_counters);
SCOPE_EXIT({ CurrentThread::get().attachProfileCountersScope(prev_scope); });
/// FIXME: if files are compressed, profile counters update does not work fully (object storage related counters are not saved). Why?
try try
{ {
auto timer = DB::CurrentThread::getProfileEvents().timer(ProfileEvents::ObjectStorageQueuePullMicroseconds); auto timer = DB::CurrentThread::getProfileEvents().timer(ProfileEvents::ObjectStorageQueuePullMicroseconds);
@ -714,7 +710,6 @@ void ObjectStorageQueueSource::appendLogElement(
.file_name = filename, .file_name = filename,
.rows_processed = processed_rows, .rows_processed = processed_rows,
.status = processed ? ObjectStorageQueueLogElement::ObjectStorageQueueStatus::Processed : ObjectStorageQueueLogElement::ObjectStorageQueueStatus::Failed, .status = processed ? ObjectStorageQueueLogElement::ObjectStorageQueueStatus::Processed : ObjectStorageQueueLogElement::ObjectStorageQueueStatus::Failed,
.counters_snapshot = file_status_.profile_counters.getPartiallyAtomicSnapshot(),
.processing_start_time = file_status_.processing_start_time, .processing_start_time = file_status_.processing_start_time,
.processing_end_time = file_status_.processing_end_time, .processing_end_time = file_status_.processing_end_time,
.exception = file_status_.getException(), .exception = file_status_.getException(),

View File

@ -32,7 +32,6 @@ ColumnsDescription StorageSystemS3Queue::getColumnsDescription()
{"status", std::make_shared<DataTypeString>(), "Status of processing: Processed, Processing, Failed"}, {"status", std::make_shared<DataTypeString>(), "Status of processing: Processed, Processing, Failed"},
{"processing_start_time", std::make_shared<DataTypeNullable>(std::make_shared<DataTypeDateTime>()), "Time at which processing of the file started"}, {"processing_start_time", std::make_shared<DataTypeNullable>(std::make_shared<DataTypeDateTime>()), "Time at which processing of the file started"},
{"processing_end_time", std::make_shared<DataTypeNullable>(std::make_shared<DataTypeDateTime>()), "Time at which processing of the file ended"}, {"processing_end_time", std::make_shared<DataTypeNullable>(std::make_shared<DataTypeDateTime>()), "Time at which processing of the file ended"},
{"ProfileEvents", std::make_shared<DataTypeMap>(std::make_shared<DataTypeString>(), std::make_shared<DataTypeUInt64>()), "Profile events collected during processing of the file"},
{"exception", std::make_shared<DataTypeString>(), "Exception which happened during processing"}, {"exception", std::make_shared<DataTypeString>(), "Exception which happened during processing"},
}; };
} }
@ -65,8 +64,6 @@ void StorageSystemS3Queue::fillData(MutableColumns & res_columns, ContextPtr, co
else else
res_columns[i++]->insertDefault(); res_columns[i++]->insertDefault();
ProfileEvents::dumpToMapColumn(file_status->profile_counters.getPartiallyAtomicSnapshot(), res_columns[i++].get(), true);
res_columns[i++]->insert(file_status->getException()); res_columns[i++]->insert(file_status->getException());
} }
} }

View File

@ -36,7 +36,7 @@ StoragePtr ITableFunction::execute(const ASTPtr & ast_function, ContextPtr conte
if (cached_columns.empty()) if (cached_columns.empty())
return executeImpl(ast_function, context, table_name, std::move(cached_columns), is_insert_query); return executeImpl(ast_function, context, table_name, std::move(cached_columns), is_insert_query);
if (hasStaticStructure() && cached_columns == getActualTableStructure(context,is_insert_query)) if (hasStaticStructure() && cached_columns == getActualTableStructure(context, is_insert_query))
return executeImpl(ast_function, context_to_use, table_name, std::move(cached_columns), is_insert_query); return executeImpl(ast_function, context_to_use, table_name, std::move(cached_columns), is_insert_query);
auto this_table_function = shared_from_this(); auto this_table_function = shared_from_this();

View File

@ -153,7 +153,7 @@ def test_single_log_file(started_cluster):
bucket = started_cluster.minio_bucket bucket = started_cluster.minio_bucket
TABLE_NAME = "test_single_log_file" TABLE_NAME = "test_single_log_file"
inserted_data = "SELECT number, toString(number + 1) FROM numbers(100)" inserted_data = "SELECT number as a, toString(number + 1) as b FROM numbers(100)"
parquet_data_path = create_initial_data_file( parquet_data_path = create_initial_data_file(
started_cluster, instance, inserted_data, TABLE_NAME started_cluster, instance, inserted_data, TABLE_NAME
) )
@ -511,3 +511,104 @@ def test_restart_broken_table_function(started_cluster):
upload_directory(minio_client, bucket, f"/{TABLE_NAME}", "") upload_directory(minio_client, bucket, f"/{TABLE_NAME}", "")
assert int(instance.query(f"SELECT count() FROM {TABLE_NAME}")) == 100 assert int(instance.query(f"SELECT count() FROM {TABLE_NAME}")) == 100
def test_partition_columns(started_cluster):
instance = started_cluster.instances["node1"]
spark = started_cluster.spark_session
minio_client = started_cluster.minio_client
bucket = started_cluster.minio_bucket
TABLE_NAME = "test_partition_columns"
result_file = f"{TABLE_NAME}"
partition_columns = ["b", "c", "d", "e"]
delta_table = (
DeltaTable.create(spark)
.tableName(TABLE_NAME)
.location(f"/{result_file}")
.addColumn("a", "INT")
.addColumn("b", "STRING")
.addColumn("c", "DATE")
.addColumn("d", "INT")
.addColumn("e", "BOOLEAN")
.partitionedBy(partition_columns)
.execute()
)
num_rows = 9
schema = StructType(
[
StructField("a", IntegerType()),
StructField("b", StringType()),
StructField("c", DateType()),
StructField("d", IntegerType()),
StructField("e", BooleanType()),
]
)
for i in range(1, num_rows + 1):
data = [
(
i,
"test" + str(i),
datetime.strptime(f"2000-01-0{i}", "%Y-%m-%d"),
i,
False,
)
]
df = spark.createDataFrame(data=data, schema=schema)
df.printSchema()
df.write.mode("append").format("delta").partitionBy(partition_columns).save(
f"/{TABLE_NAME}"
)
minio_client = started_cluster.minio_client
bucket = started_cluster.minio_bucket
files = upload_directory(minio_client, bucket, f"/{TABLE_NAME}", "")
assert len(files) > 0
print(f"Uploaded files: {files}")
result = instance.query(
f"describe table deltaLake('http://{started_cluster.minio_ip}:{started_cluster.minio_port}/{bucket}/{result_file}/', 'minio', 'minio123')"
).strip()
assert (
result
== "a\tNullable(Int32)\t\t\t\t\t\nb\tNullable(String)\t\t\t\t\t\nc\tNullable(Date32)\t\t\t\t\t\nd\tNullable(Int32)\t\t\t\t\t\ne\tNullable(Bool)"
)
result = int(
instance.query(
f"""SELECT count()
FROM deltaLake('http://{started_cluster.minio_ip}:{started_cluster.minio_port}/{bucket}/{result_file}/', 'minio', 'minio123')
"""
)
)
assert result == num_rows
result = int(
instance.query(
f"""SELECT count()
FROM deltaLake('http://{started_cluster.minio_ip}:{started_cluster.minio_port}/{bucket}/{result_file}/', 'minio', 'minio123')
WHERE c == toDateTime('2000/01/05')
"""
)
)
assert result == 1
# instance.query(
# f"""
# DROP TABLE IF EXISTS {TABLE_NAME};
# CREATE TABLE {TABLE_NAME} (a Int32, b String, c DateTime)
# ENGINE=DeltaLake('http://{started_cluster.minio_ip}:{started_cluster.minio_port}/{bucket}/{result_file}/', 'minio', 'minio123')"""
# )
# assert (
# int(
# instance.query(
# f"SELECT count() FROM {TABLE_NAME} WHERE c != toDateTime('2000/01/05')"
# )
# )
# == num_rows - 1
# )
# instance.query(f"SELECT a, b, c, FROM {TABLE_NAME}")
# assert False

View File

@ -1 +0,0 @@
SELECT sum(ignore(bitTest(number, 65))) FROM numbers(10);

View File

@ -1,3 +1,22 @@
-- bitTestAny
0 1
1 0
2 1
3 0
4 1
5 0
6 1
7 0
-- bitTestAll
0 1
1 0
2 1
3 0
4 1
5 0
6 1
7 0
-- bitTest
0 1 0 1
1 0 1 0
2 1 2 1
@ -6,98 +25,6 @@
5 0 5 0
6 1 6 1
7 0 7 0
8 0
9 0
10 0
11 0
12 0
13 0
14 0
15 0
16 0
17 0
18 0
19 0
20 0
21 0
22 0
23 0
24 0
25 0
26 0
27 0
28 0
29 0
30 0
31 0
32 0
33 0
34 0
35 0
36 0
37 0
38 0
39 0
40 0
41 0
42 0
43 0
44 0
45 0
46 0
47 0
48 0
49 0
50 0
51 0
52 0
53 0
54 0
55 0
56 0
57 0
58 0
59 0
60 0
61 0
62 0
63 0
64 0
65 0
66 0
67 0
68 0
69 0
70 0
71 0
72 0
73 0
74 0
75 0
76 0
77 0
78 0
79 0
80 0
81 0
82 0
83 0
84 0
85 0
86 0
87 0
88 0
89 0
90 0
91 0
92 0
93 0
94 0
95 0
96 0
97 0
98 0
99 0
0 1 0 1
1 0 1 0
2 1 2 1
@ -107,94 +34,10 @@
6 1 6 1
7 0 7 0
8 1 8 1
9 1 9 0
10 1 10 1
11 1 11 0
12 1 12 1
13 1 13 0
14 1 14 1
15 1 15 0
16 1
17 1
18 1
19 1
20 1
21 1
22 1
23 1
24 1
25 1
26 1
27 1
28 1
29 1
30 1
31 1
32 1
33 1
34 1
35 1
36 1
37 1
38 1
39 1
40 1
41 1
42 1
43 1
44 1
45 1
46 1
47 1
48 1
49 1
50 1
51 1
52 1
53 1
54 1
55 1
56 1
57 1
58 1
59 1
60 1
61 1
62 1
63 1
64 1
65 1
66 1
67 1
68 1
69 1
70 1
71 1
72 1
73 1
74 1
75 1
76 1
77 1
78 1
79 1
80 1
81 1
82 1
83 1
84 1
85 1
86 1
87 1
88 1
89 1
90 1
91 1
92 1
93 1
94 1
95 1
96 1
97 1
98 1
99 1

View File

@ -1,2 +1,13 @@
SELECT number, bitTestAny(toUInt8(1 + 4 + 16 + 64), number) FROM numbers(100); SELECT '-- bitTestAny';
SELECT number, bitTestAll(toUInt8(1 + 4 + 16 + 64), number) FROM numbers(100); SELECT number, bitTestAny(toUInt8(1 + 4 + 16 + 64), number) FROM numbers(8);
SELECT number, bitTestAny(toUInt8(1 + 4 + 16 + 64), number) FROM numbers(8, 16); -- { serverError PARAMETER_OUT_OF_BOUND }
SELECT '-- bitTestAll';
SELECT number, bitTestAll(toUInt8(1 + 4 + 16 + 64), number) FROM numbers(8);
SELECT number, bitTestAll(toUInt8(1 + 4 + 16 + 64), number) FROM numbers(8, 16); -- { serverError PARAMETER_OUT_OF_BOUND }
SELECT '-- bitTest';
SELECT number, bitTest(toUInt8(1 + 4 + 16 + 64), number) FROM numbers(8);
SELECT number, bitTest(toUInt8(1 + 4 + 16 + 64), number) FROM numbers(8, 16); -- { serverError PARAMETER_OUT_OF_BOUND }
SELECT number, bitTest(toUInt16(1 + 4 + 16 + 64 + 256 + 1024 + 4096 + 16384 + 65536), number) FROM numbers(16);
SELECT -number, bitTest(toUInt16(1), -number) FROM numbers(8); -- { serverError PARAMETER_OUT_OF_BOUND }

View File

@ -16,7 +16,7 @@ select min(i), max(i), count() from d where _partition_value.1 = 10 group by _pa
select min(i) from d where 1 = _partition_value.1; select min(i) from d where 1 = _partition_value.1;
-- fuzz crash https://github.com/ClickHouse/ClickHouse/issues/37151 -- fuzz crash https://github.com/ClickHouse/ClickHouse/issues/37151
SELECT min(i), max(i), count() FROM d WHERE (_partition_value.1) = 0 GROUP BY ignore(bitTest(ignore(NULL), 65535), NULL, (_partition_value.1) = 7, '10.25', bitTest(NULL, -9223372036854775808), NULL, ignore(ignore(-2147483647, NULL)), 1024), _partition_id ORDER BY _partition_id ASC NULLS FIRST; SELECT min(i), max(i), count() FROM d WHERE (_partition_value.1) = 0 GROUP BY ignore(bitTest(ignore(NULL), 0), NULL, (_partition_value.1) = 7, '10.25', bitTest(NULL, 0), NULL, ignore(ignore(-2147483647, NULL)), 1024), _partition_id ORDER BY _partition_id ASC NULLS FIRST;
drop table d; drop table d;

View File

@ -48,7 +48,6 @@ AutoML
Autocompletion Autocompletion
AvroConfluent AvroConfluent
BIGINT BIGINT
bigrams
BIGSERIAL BIGSERIAL
BORO BORO
BSON BSON
@ -223,7 +222,6 @@ DatabaseOrdinaryThreadsActive
DateTime DateTime
DateTimes DateTimes
DbCL DbCL
deallocated
Decrypted Decrypted
Deduplicate Deduplicate
Deduplication Deduplication
@ -295,7 +293,6 @@ FilesystemMainPathUsedBytes
FilesystemMainPathUsedINodes FilesystemMainPathUsedINodes
FixedString FixedString
FlameGraph FlameGraph
flameGraph
Flink Flink
ForEach ForEach
FreeBSD FreeBSD
@ -995,6 +992,8 @@ UPDATEs
URIs URIs
URL URL
URL's URL's
URLDecode
URLEncode
URLHash URLHash
URLHierarchy URLHierarchy
URLPathHierarchy URLPathHierarchy
@ -1012,13 +1011,10 @@ UncompressedCacheBytes
UncompressedCacheCells UncompressedCacheCells
UnidirectionalEdgeIsValid UnidirectionalEdgeIsValid
UniqThetaSketch UniqThetaSketch
unigrams
Updatable Updatable
Uppercased Uppercased
Uptime Uptime
Uptrace Uptrace
URLDecode
URLEncode
UserID UserID
Util Util
VARCHAR VARCHAR
@ -1224,6 +1220,7 @@ basename
bcrypt bcrypt
benchmarking benchmarking
bfloat bfloat
bigrams
binlog binlog
bitAnd bitAnd
bitCount bitCount
@ -1473,6 +1470,7 @@ dbeaver
dbgen dbgen
dbms dbms
ddl ddl
deallocated
deallocation deallocation
deallocations deallocations
debian debian
@ -1512,11 +1510,11 @@ deserializing
destructor destructor
destructors destructors
detectCharset detectCharset
detectTonality
detectLanguage detectLanguage
detectLanguageMixed detectLanguageMixed
detectLanguageUnknown detectLanguageUnknown
detectProgrammingLanguage detectProgrammingLanguage
detectTonality
determinator determinator
deterministically deterministically
dictGet dictGet
@ -1532,8 +1530,8 @@ dictIsIn
disableProtocols disableProtocols
disjunction disjunction
disjunctions disjunctions
displaySecretsInShowAndSelect
displayName displayName
displaySecretsInShowAndSelect
distro distro
divideDecimal divideDecimal
dmesg dmesg
@ -1583,11 +1581,11 @@ evalMLMethod
exFAT exFAT
expiryMsec expiryMsec
exponentialMovingAverage exponentialMovingAverage
exponentialmovingaverage
exponentialTimeDecayedAvg exponentialTimeDecayedAvg
exponentialTimeDecayedCount exponentialTimeDecayedCount
exponentialTimeDecayedMax exponentialTimeDecayedMax
exponentialTimeDecayedSum exponentialTimeDecayedSum
exponentialmovingaverage
expr expr
exprN exprN
extendedVerification extendedVerification
@ -1624,6 +1622,7 @@ firstSignificantSubdomainCustom
firstSignificantSubdomainCustomRFC firstSignificantSubdomainCustomRFC
firstSignificantSubdomainRFC firstSignificantSubdomainRFC
fixedstring fixedstring
flameGraph
flamegraph flamegraph
flatbuffers flatbuffers
flattenTuple flattenTuple
@ -1806,8 +1805,8 @@ incrementing
indexHint indexHint
indexOf indexOf
infi infi
infty
inflight inflight
infty
initcap initcap
initcapUTF initcapUTF
initialQueryID initialQueryID
@ -1955,9 +1954,9 @@ loghouse
london london
lookups lookups
loongarch loongarch
lowcardinality
lowCardinalityIndices lowCardinalityIndices
lowCardinalityKeys lowCardinalityKeys
lowcardinality
lowerUTF lowerUTF
lowercased lowercased
lttb lttb
@ -2265,9 +2264,9 @@ proleptic
prometheus prometheus
proportionsZTest proportionsZTest
proto proto
protocol
protobuf protobuf
protobufsingle protobufsingle
protocol
proxied proxied
pseudorandom pseudorandom
pseudorandomize pseudorandomize
@ -2759,6 +2758,9 @@ topLevelDomain
topLevelDomainRFC topLevelDomainRFC
topk topk
topkweighted topkweighted
transactionID
transactionLatestSnapshot
transactionOldestSnapshot
transactional transactional
transactionally transactionally
translateUTF translateUTF
@ -2812,6 +2814,7 @@ unescaping
unhex unhex
unicode unicode
unidimensional unidimensional
unigrams
unintuitive unintuitive
uniq uniq
uniqCombined uniqCombined