Merge remote-tracking branch 'blessed/master' into settings_checks

This commit is contained in:
Raúl Marín 2024-12-03 13:45:37 +01:00
commit 5a3ecbd231
94 changed files with 820 additions and 282 deletions

View File

@ -66,6 +66,7 @@ if (ENABLE_CHECK_HEAVY_BUILDS)
# Twice as large
set (RLIMIT_DATA 10000000000)
set (RLIMIT_AS 20000000000)
set (RLIMIT_CPU 2000)
endif()
# For some files currently building RISCV64/LOONGARCH64 might be too slow.

View File

@ -82,5 +82,5 @@ ENV MINIO_ROOT_USER="clickhouse"
ENV MINIO_ROOT_PASSWORD="clickhouse"
ENV EXPORT_S3_STORAGE_POLICIES=1
RUN npm install -g azurite@3.30.0 \
RUN npm install -g azurite@3.33.0 \
&& npm install -g tslib && npm install -g node

View File

@ -733,6 +733,18 @@ SELECT JSONExtract('{"day": "Thursday"}', 'day', 'Enum8(\'Sunday\' = 0, \'Monday
SELECT JSONExtract('{"day": 5}', 'day', 'Enum8(\'Sunday\' = 0, \'Monday\' = 1, \'Tuesday\' = 2, \'Wednesday\' = 3, \'Thursday\' = 4, \'Friday\' = 5, \'Saturday\' = 6)') = 'Friday'
```
Referring to a nested values by passing multiple indices_or_keys parameters:
```
SELECT JSONExtract('{"a":{"b":"hello","c":{"d":[1,2,3],"e":[1,3,7]}}}','a','c','Map(String, Array(UInt8))') AS val, toTypeName(val), val['d'];
```
Result:
```
┌─val───────────────────────┬─toTypeName(val)───────────┬─arrayElement(val, 'd')─┐
│ {'d':[1,2,3],'e':[1,3,7]} │ Map(String, Array(UInt8)) │ [1,2,3] │
└───────────────────────────┴───────────────────────────┴────────────────────────┘
```
### JSONExtractKeysAndValues
Parses key-value pairs from JSON where the values are of the given ClickHouse data type.

View File

@ -24,7 +24,6 @@ namespace DB
{
namespace ErrorCodes
{
extern const int AZURE_BLOB_STORAGE_ERROR;
extern const int LOGICAL_ERROR;
}
@ -234,11 +233,8 @@ bool BackupWriterAzureBlobStorage::fileExists(const String & file_name)
UInt64 BackupWriterAzureBlobStorage::getFileSize(const String & file_name)
{
String key = fs::path(blob_path) / file_name;
RelativePathsWithMetadata children;
object_storage->listObjects(key,children,/*max_keys*/0);
if (children.empty())
throw Exception(ErrorCodes::AZURE_BLOB_STORAGE_ERROR, "Object must exist");
return children[0]->metadata->size_bytes;
ObjectMetadata object_metadata = object_storage->getObjectMetadata(key);
return object_metadata.size_bytes;
}
std::unique_ptr<ReadBuffer> BackupWriterAzureBlobStorage::readFile(const String & file_name, size_t /*expected_file_size*/)

View File

@ -38,7 +38,7 @@ public:
private:
const DataSourceDescription data_source_description;
std::shared_ptr<const Azure::Storage::Blobs::BlobContainerClient> client;
std::shared_ptr<const AzureBlobStorage::ContainerClient> client;
AzureBlobStorage::ConnectionParams connection_params;
String blob_path;
std::unique_ptr<AzureObjectStorage> object_storage;
@ -88,7 +88,7 @@ private:
void removeFilesBatch(const Strings & file_names);
const DataSourceDescription data_source_description;
std::shared_ptr<const Azure::Storage::Blobs::BlobContainerClient> client;
std::shared_ptr<const AzureBlobStorage::ContainerClient> client;
AzureBlobStorage::ConnectionParams connection_params;
String blob_path;
std::unique_ptr<AzureObjectStorage> object_storage;

View File

@ -151,7 +151,8 @@ void ConnectionEstablisherAsync::Task::run(AsyncCallback async_callback, Suspend
{
connection_establisher_async.reset();
connection_establisher_async.connection_establisher.setAsyncCallback(async_callback);
connection_establisher_async.connection_establisher.run(connection_establisher_async.result, connection_establisher_async.fail_message);
connection_establisher_async.connection_establisher.run(connection_establisher_async.result,
connection_establisher_async.fail_message, connection_establisher_async.force_connected);
connection_establisher_async.is_finished = true;
}

View File

@ -76,6 +76,8 @@ public:
const std::string & getFailMessage() const { return fail_message; }
void resumeConnectionWithForceOption(bool force_connected_) {force_connected = force_connected_; resume();}
private:
bool checkBeforeTaskResume() override;
@ -125,6 +127,7 @@ private:
bool is_finished = false;
bool restarted = false;
bool force_connected = false;
};
#endif

View File

@ -281,7 +281,7 @@ int HedgedConnectionsFactory::getReadyFileDescriptor(bool blocking, AsyncCallbac
HedgedConnectionsFactory::State HedgedConnectionsFactory::resumeConnectionEstablisher(int index, Connection *& connection_out)
{
replicas[index].connection_establisher->resume();
replicas[index].connection_establisher->resumeConnectionWithForceOption(/*force_connected_*/ shuffled_pools[index].error_count != 0);
if (replicas[index].connection_establisher->isCancelled())
return State::CANNOT_CHOOSE;

View File

@ -256,10 +256,6 @@ AllocationTrace MemoryTracker::allocImpl(Int64 size, bool throw_if_memory_exceed
Int64 will_be = size ? size + amount.fetch_add(size, std::memory_order_relaxed) : amount.load(std::memory_order_relaxed);
Int64 will_be_rss = size ? size + rss.fetch_add(size, std::memory_order_relaxed) : rss.load(std::memory_order_relaxed);
auto metric_loaded = metric.load(std::memory_order_relaxed);
if (metric_loaded != CurrentMetrics::end() && size)
CurrentMetrics::add(metric_loaded, size);
Int64 current_hard_limit = hard_limit.load(std::memory_order_relaxed);
Int64 current_profiler_limit = profiler_limit.load(std::memory_order_relaxed);
@ -371,6 +367,10 @@ AllocationTrace MemoryTracker::allocImpl(Int64 size, bool throw_if_memory_exceed
}
}
auto metric_loaded = metric.load(std::memory_order_relaxed);
if (metric_loaded != CurrentMetrics::end() && size)
CurrentMetrics::add(metric_loaded, size);
if (peak_updated && allocation_traced)
{
MemoryTrackerBlockerInThread untrack_lock(VariableContext::Global);

View File

@ -229,6 +229,7 @@
\
M(WaitMarksLoadMicroseconds, "Time spent loading marks", ValueType::Microseconds) \
M(BackgroundLoadingMarksTasks, "Number of background tasks for loading marks", ValueType::Number) \
M(LoadedMarksFiles, "Number of mark files loaded.", ValueType::Number) \
M(LoadedMarksCount, "Number of marks loaded (total across columns).", ValueType::Number) \
M(LoadedMarksMemoryBytes, "Size of in-memory representations of loaded marks.", ValueType::Bytes) \
M(LoadedPrimaryIndexFiles, "Number of primary index files loaded.", ValueType::Number) \
@ -814,6 +815,7 @@ The server successfully detected this situation and will download merged part fr
M(LogWarning, "Number of log messages with level Warning", ValueType::Number) \
M(LogError, "Number of log messages with level Error", ValueType::Number) \
M(LogFatal, "Number of log messages with level Fatal", ValueType::Number) \
M(LoggerElapsedNanoseconds, "Cumulative time spend in logging", ValueType::Nanoseconds) \
\
M(InterfaceHTTPSendBytes, "Number of bytes sent through HTTP interfaces", ValueType::Bytes) \
M(InterfaceHTTPReceiveBytes, "Number of bytes received through HTTP interfaces", ValueType::Bytes) \
@ -1087,6 +1089,11 @@ void incrementForLogMessage(Poco::Message::Priority priority)
}
}
void incrementLoggerElapsedNanoseconds(UInt64 ns)
{
increment(LoggerElapsedNanoseconds, ns);
}
CountersIncrement::CountersIncrement(Counters::Snapshot const & snapshot)
{
init();

View File

@ -173,6 +173,9 @@ namespace ProfileEvents
/// Increment a counter for log messages.
void incrementForLogMessage(Poco::Message::Priority priority);
/// Increment time consumed by logging.
void incrementLoggerElapsedNanoseconds(UInt64 ns);
/// Get name of event by identifier. Returns statically allocated string.
const char * getName(Event event);

View File

@ -11,6 +11,7 @@
#include <Common/Logger.h>
#include <Common/LoggingFormatStringHelpers.h>
#include <Common/ProfileEvents.h>
#include <Common/Stopwatch.h>
#define LogToStr(x, y) std::make_unique<LogToStrImpl>(x, y)
@ -69,6 +70,7 @@ namespace impl
if (!_is_clients_log && !_logger->is((PRIORITY))) \
break; \
\
Stopwatch _logger_watch; \
try \
{ \
ProfileEvents::incrementForLogMessage(PRIORITY); \
@ -122,6 +124,7 @@ namespace impl
{ \
::write(STDERR_FILENO, static_cast<const void *>(MESSAGE_FOR_EXCEPTION_ON_LOGGING), sizeof(MESSAGE_FOR_EXCEPTION_ON_LOGGING)); \
} \
ProfileEvents::incrementLoggerElapsedNanoseconds(_logger_watch.elapsedNanoseconds()); \
} while (false)

View File

@ -35,61 +35,71 @@ std::string makeRegexpPatternFromGlobs(const std::string & initial_str_with_glob
}
std::string escaped_with_globs = buf_for_escaping.str();
static const re2::RE2 enum_or_range(R"({([\d]+\.\.[\d]+|[^{}*,]+,[^{}*]*[^{}*,])})"); /// regexp for {expr1,expr2,expr3} or {M..N}, where M and N - non-negative integers, expr's should be without "{", "}", "*" and ","
std::string_view input(escaped_with_globs);
static const re2::RE2 range_regex(R"({([\d]+\.\.[\d]+)})"); /// regexp for {M..N}, where M and N - non-negative integers
static const re2::RE2 enum_regex(R"({([^{}*,]+[^{}*]*[^{}*,])})"); /// regexp for {expr1,expr2,expr3}, expr's should be without "{", "}", "*" and ","
std::string_view matched;
std::string_view input(escaped_with_globs);
std::ostringstream oss_for_replacing; // STYLE_CHECK_ALLOW_STD_STRING_STREAM
oss_for_replacing.exceptions(std::ios::failbit);
size_t current_index = 0;
while (RE2::FindAndConsume(&input, enum_or_range, &matched))
while (RE2::FindAndConsume(&input, range_regex, &matched))
{
std::string buffer(matched);
oss_for_replacing << escaped_with_globs.substr(current_index, matched.data() - escaped_with_globs.data() - current_index - 1) << '(';
if (!buffer.contains(','))
size_t range_begin = 0;
size_t range_end = 0;
char point;
ReadBufferFromString buf_range(buffer);
buf_range >> range_begin >> point >> point >> range_end;
size_t range_begin_width = buffer.find('.');
size_t range_end_width = buffer.size() - buffer.find_last_of('.') - 1;
bool leading_zeros = buffer[0] == '0';
size_t output_width = 0;
if (range_begin > range_end) //Descending Sequence {20..15} {9..01}
{
size_t range_begin = 0;
size_t range_end = 0;
char point;
ReadBufferFromString buf_range(buffer);
buf_range >> range_begin >> point >> point >> range_end;
std::swap(range_begin,range_end);
leading_zeros = buffer[buffer.find_last_of('.')+1]=='0';
std::swap(range_begin_width,range_end_width);
}
if (range_begin_width == 1 && leading_zeros)
output_width = 1; ///Special Case: {0..10} {0..999}
else
output_width = std::max(range_begin_width, range_end_width);
size_t range_begin_width = buffer.find('.');
size_t range_end_width = buffer.size() - buffer.find_last_of('.') - 1;
bool leading_zeros = buffer[0] == '0';
size_t output_width = 0;
if (range_begin > range_end) //Descending Sequence {20..15} {9..01}
{
std::swap(range_begin,range_end);
leading_zeros = buffer[buffer.find_last_of('.')+1]=='0';
std::swap(range_begin_width,range_end_width);
}
if (range_begin_width == 1 && leading_zeros)
output_width = 1; ///Special Case: {0..10} {0..999}
else
output_width = std::max(range_begin_width, range_end_width);
if (leading_zeros)
oss_for_replacing << std::setfill('0') << std::setw(static_cast<int>(output_width));
oss_for_replacing << range_begin;
for (size_t i = range_begin + 1; i <= range_end; ++i)
{
oss_for_replacing << '|';
if (leading_zeros)
oss_for_replacing << std::setfill('0') << std::setw(static_cast<int>(output_width));
oss_for_replacing << range_begin;
oss_for_replacing << i;
}
for (size_t i = range_begin + 1; i <= range_end; ++i)
{
oss_for_replacing << '|';
if (leading_zeros)
oss_for_replacing << std::setfill('0') << std::setw(static_cast<int>(output_width));
oss_for_replacing << i;
}
}
else
{
std::replace(buffer.begin(), buffer.end(), ',', '|');
oss_for_replacing << buffer;
}
oss_for_replacing << ")";
current_index = input.data() - escaped_with_globs.data();
}
while (RE2::FindAndConsume(&input, enum_regex, &matched))
{
std::string buffer(matched);
oss_for_replacing << escaped_with_globs.substr(current_index, matched.data() - escaped_with_globs.data() - current_index - 1) << '(';
std::replace(buffer.begin(), buffer.end(), ',', '|');
oss_for_replacing << buffer;
oss_for_replacing << ")";
current_index = input.data() - escaped_with_globs.data();
}
oss_for_replacing << escaped_with_globs.substr(current_index);
std::string almost_res = oss_for_replacing.str();
WriteBufferFromOwnString buf_final_processing;

View File

@ -12,6 +12,9 @@ TEST(Common, makeRegexpPatternFromGlobs)
EXPECT_EQ(makeRegexpPatternFromGlobs("*"), "[^/]*");
EXPECT_EQ(makeRegexpPatternFromGlobs("/?"), "/[^/]");
EXPECT_EQ(makeRegexpPatternFromGlobs("/*"), "/[^/]*");
EXPECT_EQ(makeRegexpPatternFromGlobs("{123}"), "(123)");
EXPECT_EQ(makeRegexpPatternFromGlobs("{test}"), "(test)");
EXPECT_EQ(makeRegexpPatternFromGlobs("{test.tar.gz}"), "(test\\.tar\\.gz)");
EXPECT_EQ(makeRegexpPatternFromGlobs("*_{{a,b,c,d}}/?.csv"), "[^/]*_\\{(a|b|c|d)\\}/[^/]\\.csv");
/* Regex Parsing for {..} can have three possible cases
1) The left range width == the right range width

View File

@ -142,13 +142,14 @@ void KeeperStateMachine<Storage>::init()
}
catch (...)
{
tryLogCurrentException(
LOG_FATAL(
log,
fmt::format(
"Aborting because of failure to load from latest snapshot with index {}. Problematic snapshot can be removed but it will "
"lead to data loss",
latest_log_index));
std::abort();
"Failure to load from latest snapshot with index {}: {}",
latest_log_index,
getCurrentExceptionMessage(true, true, false));
LOG_FATAL(
log, "Manual intervention is necessary for recovery. Problematic snapshot can be removed but it will lead to data loss");
abort();
}
}
@ -427,8 +428,13 @@ bool KeeperStateMachine<Storage>::preprocess(const KeeperStorageBase::RequestFor
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__, fmt::format("Failed to preprocess stored log at index {}, aborting to avoid inconsistent state", request_for_session.log_idx));
std::abort();
LOG_FATAL(
log,
"Failed to preprocess stored log at index {}: {}",
request_for_session.log_idx,
getCurrentExceptionMessage(true, true, false));
LOG_FATAL(log, "Aborting to avoid inconsistent state");
abort();
}
if (keeper_context->digestEnabled() && request_for_session.digest)

View File

@ -8,6 +8,11 @@
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
}
template<typename V>
struct ListNode
{
@ -292,7 +297,8 @@ public:
{
size_t hash_value = map.hash(key);
auto it = map.find(key, hash_value);
chassert(it != map.end());
if (it == map.end())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Could not find key: '{}'", key);
auto list_itr = it->getMapped();
uint64_t old_value_size = list_itr->value.sizeInBytes();
@ -348,7 +354,8 @@ public:
const V & getValue(StringRef key) const
{
auto it = map.find(key);
chassert(it);
if (it == map.end())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Could not find key: '{}'", key);
return it->getMapped()->value;
}
@ -356,7 +363,8 @@ public:
{
for (auto & itr : snapshot_invalid_iters)
{
chassert(!itr->isActiveInMap());
if (itr->isActiveInMap())
throw Exception(ErrorCodes::LOGICAL_ERROR, "{} is not active in map", itr->key);
updateDataSize(ERASE, itr->key.size, 0, itr->value.sizeInBytes(), /*remove_old=*/true);
if (itr->getFreeKey())
arena.free(const_cast<char *>(itr->key.data), itr->key.size);

View File

@ -479,12 +479,20 @@ public:
return true;
}
template <typename T> auto & safeGet() const
template <typename T> const auto & safeGet() const &
{
return const_cast<Field *>(this)->safeGet<T>();
}
template <typename T> auto safeGet() const &&
{
return std::move(const_cast<Field *>(this)->safeGet<T>());
}
template <typename T> auto & safeGet();
template <typename T> auto & safeGet() &;
template <typename T> auto safeGet() &&
{
return std::move(safeGet<T>());
}
bool operator< (const Field & rhs) const
{
@ -880,7 +888,7 @@ constexpr bool isInt64OrUInt64orBoolFieldType(Field::Types::Which t)
}
template <typename T>
auto & Field::safeGet()
auto & Field::safeGet() &
{
const Types::Which target = TypeToEnum<NearestFieldType<std::decay_t<T>>>::value;

View File

@ -1242,6 +1242,9 @@ Set the quoting rule for identifiers in SHOW CREATE query
)", 0) \
DECLARE(IdentifierQuotingStyle, show_create_query_identifier_quoting_style, IdentifierQuotingStyle::Backticks, R"(
Set the quoting style for identifiers in SHOW CREATE query
)", 0) \
DECLARE(String, composed_data_type_output_format_mode, "default", R"(
Set composed data type output format mode, default or spark.
)", 0) \
// End of FORMAT_FACTORY_SETTINGS

View File

@ -67,6 +67,7 @@ static std::initializer_list<std::pair<ClickHouseVersion, SettingsChangesHistory
{"max_bytes_ratio_before_external_group_by", 0., 0., "New setting."},
{"max_bytes_ratio_before_external_sort", 0., 0., "New setting."},
{"use_async_executor_for_materialized_views", false, false, "New setting."},
{"composed_data_type_output_format_mode", "default", "default", "New setting"},
{"http_response_headers", "", "", "New setting."},
}
},

View File

@ -401,7 +401,7 @@ void SerializationArray::deserializeBinaryBulkWithMultipleStreams(
template <typename Writer>
static void serializeTextImpl(const IColumn & column, size_t row_num, WriteBuffer & ostr, Writer && write_nested)
static void serializeTextImpl(const IColumn & column, size_t row_num, const FormatSettings & settings, WriteBuffer & ostr, Writer && write_nested)
{
const ColumnArray & column_array = assert_cast<const ColumnArray &>(column);
const ColumnArray::Offsets & offsets = column_array.getOffsets();
@ -412,10 +412,14 @@ static void serializeTextImpl(const IColumn & column, size_t row_num, WriteBuffe
const IColumn & nested_column = column_array.getData();
writeChar('[', ostr);
for (size_t i = offset; i < next_offset; ++i)
if (next_offset != offset)
write_nested(nested_column, offset);
for (size_t i = offset + 1; i < next_offset; ++i)
{
if (i != offset)
writeChar(',', ostr);
writeChar(',', ostr);
if (settings.composed_data_type_output_format_mode == "spark")
writeChar(' ', ostr);
write_nested(nested_column, i);
}
writeChar(']', ostr);
@ -520,10 +524,13 @@ static ReturnType deserializeTextImpl(IColumn & column, ReadBuffer & istr, Reade
void SerializationArray::serializeText(const IColumn & column, size_t row_num, WriteBuffer & ostr, const FormatSettings & settings) const
{
serializeTextImpl(column, row_num, ostr,
serializeTextImpl(column, row_num, settings, ostr,
[&](const IColumn & nested_column, size_t i)
{
nested->serializeTextQuoted(nested_column, i, ostr, settings);
if (settings.composed_data_type_output_format_mode == "spark")
nested->serializeText(nested_column, i, ostr, settings);
else
nested->serializeTextQuoted(nested_column, i, ostr, settings);
});
}

View File

@ -90,6 +90,7 @@ template <typename KeyWriter, typename ValueWriter>
void SerializationMap::serializeTextImpl(
const IColumn & column,
size_t row_num,
const FormatSettings & settings,
WriteBuffer & ostr,
KeyWriter && key_writer,
ValueWriter && value_writer) const
@ -104,15 +105,31 @@ void SerializationMap::serializeTextImpl(
size_t next_offset = offsets[row_num];
writeChar('{', ostr);
for (size_t i = offset; i < next_offset; ++i)
if (offset != next_offset)
{
if (i != offset)
writeChar(',', ostr);
key_writer(ostr, key, nested_tuple.getColumn(0), i);
writeChar(':', ostr);
value_writer(ostr, value, nested_tuple.getColumn(1), i);
key_writer(ostr, key, nested_tuple.getColumn(0), offset);
if (settings.composed_data_type_output_format_mode == "spark")
writeString(std::string_view(" -> "), ostr);
else
writeChar(':', ostr);
value_writer(ostr, value, nested_tuple.getColumn(1), offset);
}
if (settings.composed_data_type_output_format_mode == "spark")
for (size_t i = offset + 1; i < next_offset; ++i)
{
writeString(std::string_view(", "), ostr);
key_writer(ostr, key, nested_tuple.getColumn(0), i);
writeString(std::string_view(" -> "), ostr);
value_writer(ostr, value, nested_tuple.getColumn(1), i);
}
else
for (size_t i = offset + 1; i < next_offset; ++i)
{
writeChar(',', ostr);
key_writer(ostr, key, nested_tuple.getColumn(0), i);
writeChar(':', ostr);
value_writer(ostr, value, nested_tuple.getColumn(1), i);
}
writeChar('}', ostr);
}
@ -221,10 +238,13 @@ void SerializationMap::serializeText(const IColumn & column, size_t row_num, Wri
{
auto writer = [&settings](WriteBuffer & buf, const SerializationPtr & subcolumn_serialization, const IColumn & subcolumn, size_t pos)
{
subcolumn_serialization->serializeTextQuoted(subcolumn, pos, buf, settings);
if (settings.composed_data_type_output_format_mode == "spark")
subcolumn_serialization->serializeText(subcolumn, pos, buf, settings);
else
subcolumn_serialization->serializeTextQuoted(subcolumn, pos, buf, settings);
};
serializeTextImpl(column, row_num, ostr, writer, writer);
serializeTextImpl(column, row_num, settings, ostr, writer, writer);
}
void SerializationMap::deserializeText(IColumn & column, ReadBuffer & istr, const FormatSettings & settings, bool whole) const
@ -266,7 +286,7 @@ bool SerializationMap::tryDeserializeText(IColumn & column, ReadBuffer & istr, c
void SerializationMap::serializeTextJSON(const IColumn & column, size_t row_num, WriteBuffer & ostr, const FormatSettings & settings) const
{
serializeTextImpl(column, row_num, ostr,
serializeTextImpl(column, row_num, settings, ostr,
[&settings](WriteBuffer & buf, const SerializationPtr & subcolumn_serialization, const IColumn & subcolumn, size_t pos)
{
/// We need to double-quote all keys (including integers) to produce valid JSON.

View File

@ -70,7 +70,7 @@ public:
private:
template <typename KeyWriter, typename ValueWriter>
void serializeTextImpl(const IColumn & column, size_t row_num, WriteBuffer & ostr, KeyWriter && key_writer, ValueWriter && value_writer) const;
void serializeTextImpl(const IColumn & column, size_t row_num, const FormatSettings & settings, WriteBuffer & ostr, KeyWriter && key_writer, ValueWriter && value_writer) const;
template <typename ReturnType = void, typename Reader>
ReturnType deserializeTextImpl(IColumn & column, ReadBuffer & istr, Reader && reader) const;

View File

@ -137,12 +137,25 @@ void SerializationTuple::deserializeBinary(IColumn & column, ReadBuffer & istr,
void SerializationTuple::serializeText(const IColumn & column, size_t row_num, WriteBuffer & ostr, const FormatSettings & settings) const
{
writeChar('(', ostr);
for (size_t i = 0; i < elems.size(); ++i)
if (!elems.empty())
{
if (i != 0)
writeChar(',', ostr);
elems[i]->serializeTextQuoted(extractElementColumn(column, i), row_num, ostr, settings);
if (settings.composed_data_type_output_format_mode == "spark")
elems[0]->serializeText(extractElementColumn(column, 0), row_num, ostr, settings);
else
elems[0]->serializeTextQuoted(extractElementColumn(column, 0), row_num, ostr, settings);
}
if (settings.composed_data_type_output_format_mode == "spark")
for (size_t i = 1; i < elems.size(); ++i)
{
writeString(std::string_view(", "), ostr);
elems[i]->serializeText(extractElementColumn(column, i), row_num, ostr, settings);
}
else
for (size_t i = 1; i < elems.size(); ++i)
{
writeChar(',', ostr);
elems[i]->serializeTextQuoted(extractElementColumn(column, i), row_num, ostr, settings);
}
writeChar(')', ostr);
}

View File

@ -37,7 +37,7 @@ namespace ErrorCodes
}
ReadBufferFromAzureBlobStorage::ReadBufferFromAzureBlobStorage(
std::shared_ptr<const Azure::Storage::Blobs::BlobContainerClient> blob_container_client_,
ContainerClientPtr blob_container_client_,
const String & path_,
const ReadSettings & read_settings_,
size_t max_single_read_retries_,
@ -228,7 +228,7 @@ void ReadBufferFromAzureBlobStorage::initialize()
try
{
ProfileEvents::increment(ProfileEvents::AzureGetObject);
if (blob_container_client->GetClickhouseOptions().IsClientForDisk)
if (blob_container_client->IsClientForDisk())
ProfileEvents::increment(ProfileEvents::DiskAzureGetObject);
auto download_response = blob_client->Download(download_options);
@ -281,7 +281,7 @@ size_t ReadBufferFromAzureBlobStorage::readBigAt(char * to, size_t n, size_t ran
try
{
ProfileEvents::increment(ProfileEvents::AzureGetObject);
if (blob_container_client->GetClickhouseOptions().IsClientForDisk)
if (blob_container_client->IsClientForDisk())
ProfileEvents::increment(ProfileEvents::DiskAzureGetObject);
Azure::Storage::Blobs::DownloadBlobOptions download_options;

View File

@ -8,7 +8,7 @@
#include <IO/ReadBufferFromFileBase.h>
#include <IO/ReadSettings.h>
#include <IO/WithFileName.h>
#include <azure/storage/blobs.hpp>
#include <Disks/ObjectStorages/AzureBlobStorage/AzureBlobStorageCommon.h>
namespace DB
{
@ -16,9 +16,11 @@ namespace DB
class ReadBufferFromAzureBlobStorage : public ReadBufferFromFileBase
{
public:
using ContainerClientPtr = std::shared_ptr<const AzureBlobStorage::ContainerClient>;
using BlobClientPtr = std::unique_ptr<const AzureBlobStorage::BlobClient>;
ReadBufferFromAzureBlobStorage(
std::shared_ptr<const Azure::Storage::Blobs::BlobContainerClient> blob_container_client_,
ContainerClientPtr blob_container_client_,
const String & path_,
const ReadSettings & read_settings_,
size_t max_single_read_retries_,
@ -53,8 +55,8 @@ private:
void initialize();
std::unique_ptr<Azure::Core::IO::BodyStream> data_stream;
std::shared_ptr<const Azure::Storage::Blobs::BlobContainerClient> blob_container_client;
std::unique_ptr<Azure::Storage::Blobs::BlobClient> blob_client;
ContainerClientPtr blob_container_client;
BlobClientPtr blob_client;
const String path;
size_t max_single_read_retries;

View File

@ -54,7 +54,7 @@ BufferAllocationPolicyPtr createBufferAllocationPolicy(const AzureBlobStorage::R
}
WriteBufferFromAzureBlobStorage::WriteBufferFromAzureBlobStorage(
std::shared_ptr<const Azure::Storage::Blobs::BlobContainerClient> blob_container_client_,
AzureClientPtr blob_container_client_,
const String & blob_path_,
size_t buf_size_,
const WriteSettings & write_settings_,
@ -142,7 +142,7 @@ void WriteBufferFromAzureBlobStorage::preFinalize()
if (block_ids.empty() && detached_part_data.size() == 1 && detached_part_data.front().data_size <= max_single_part_upload_size)
{
ProfileEvents::increment(ProfileEvents::AzureUpload);
if (blob_container_client->GetClickhouseOptions().IsClientForDisk)
if (blob_container_client->IsClientForDisk())
ProfileEvents::increment(ProfileEvents::DiskAzureUpload);
auto part_data = std::move(detached_part_data.front());
@ -174,7 +174,7 @@ void WriteBufferFromAzureBlobStorage::finalizeImpl()
{
auto block_blob_client = blob_container_client->GetBlockBlobClient(blob_path);
ProfileEvents::increment(ProfileEvents::AzureCommitBlockList);
if (blob_container_client->GetClickhouseOptions().IsClientForDisk)
if (blob_container_client->IsClientForDisk())
ProfileEvents::increment(ProfileEvents::DiskAzureCommitBlockList);
execWithRetry([&](){ block_blob_client.CommitBlockList(block_ids); }, max_unexpected_write_error_retries);
@ -311,7 +311,7 @@ void WriteBufferFromAzureBlobStorage::writePart(WriteBufferFromAzureBlobStorage:
auto block_blob_client = blob_container_client->GetBlockBlobClient(blob_path);
ProfileEvents::increment(ProfileEvents::AzureStageBlock);
if (blob_container_client->GetClickhouseOptions().IsClientForDisk)
if (blob_container_client->IsClientForDisk())
ProfileEvents::increment(ProfileEvents::DiskAzureStageBlock);
Azure::Core::IO::MemoryBodyStream memory_stream(reinterpret_cast<const uint8_t *>(std::get<1>(*worker_data).memory.data()), data_size);

View File

@ -28,7 +28,7 @@ class TaskTracker;
class WriteBufferFromAzureBlobStorage : public WriteBufferFromFileBase
{
public:
using AzureClientPtr = std::shared_ptr<const Azure::Storage::Blobs::BlobContainerClient>;
using AzureClientPtr = std::shared_ptr<const AzureBlobStorage::ContainerClient>;
WriteBufferFromAzureBlobStorage(
AzureClientPtr blob_container_client_,

View File

@ -45,6 +45,7 @@ namespace Setting
namespace ErrorCodes
{
extern const int BAD_ARGUMENTS;
extern const int LOGICAL_ERROR;
}
namespace AzureBlobStorage
@ -81,6 +82,50 @@ static bool isConnectionString(const std::string & candidate)
return !candidate.starts_with("http");
}
ContainerClientWrapper::ContainerClientWrapper(RawContainerClient client_, String blob_prefix_)
: client(std::move(client_)), blob_prefix(std::move(blob_prefix_))
{
}
BlobClient ContainerClientWrapper::GetBlobClient(const String & blob_name) const
{
return client.GetBlobClient(blob_prefix / blob_name);
}
BlockBlobClient ContainerClientWrapper::GetBlockBlobClient(const String & blob_name) const
{
return client.GetBlockBlobClient(blob_prefix / blob_name);
}
BlobContainerPropertiesRespones ContainerClientWrapper::GetProperties() const
{
return client.GetProperties();
}
ListBlobsPagedResponse ContainerClientWrapper::ListBlobs(const ListBlobsOptions & options) const
{
auto new_options = options;
new_options.Prefix = blob_prefix / options.Prefix.ValueOr("");
auto response = client.ListBlobs(new_options);
auto blob_prefix_str = blob_prefix.empty() ? "" : blob_prefix.string() + "/";
for (auto & blob : response.Blobs)
{
if (!blob.Name.starts_with(blob_prefix_str))
throw Exception(ErrorCodes::LOGICAL_ERROR, "Expected prefix '{}' in blob name '{}'", blob_prefix_str, blob.Name);
blob.Name = blob.Name.substr(blob_prefix_str.size());
}
return response;
}
bool ContainerClientWrapper::IsClientForDisk() const
{
return client.GetClickhouseOptions().IsClientForDisk;
}
String ConnectionParams::getConnectionURL() const
{
if (std::holds_alternative<ConnectionString>(auth_method))
@ -99,7 +144,7 @@ std::unique_ptr<ServiceClient> ConnectionParams::createForService() const
if constexpr (std::is_same_v<T, ConnectionString>)
return std::make_unique<ServiceClient>(ServiceClient::CreateFromConnectionString(auth.toUnderType(), client_options));
else
return std::make_unique<ServiceClient>(endpoint.getEndpointWithoutContainer(), auth, client_options);
return std::make_unique<ServiceClient>(endpoint.getServiceEndpoint(), auth, client_options);
}, auth_method);
}
@ -108,9 +153,15 @@ std::unique_ptr<ContainerClient> ConnectionParams::createForContainer() const
return std::visit([this]<typename T>(const T & auth)
{
if constexpr (std::is_same_v<T, ConnectionString>)
return std::make_unique<ContainerClient>(ContainerClient::CreateFromConnectionString(auth.toUnderType(), endpoint.container_name, client_options));
{
auto raw_client = RawContainerClient::CreateFromConnectionString(auth.toUnderType(), endpoint.container_name, client_options);
return std::make_unique<ContainerClient>(std::move(raw_client), endpoint.prefix);
}
else
return std::make_unique<ContainerClient>(endpoint.getEndpoint(), auth, client_options);
{
RawContainerClient raw_client{endpoint.getContainerEndpoint(), auth, client_options};
return std::make_unique<ContainerClient>(std::move(raw_client), endpoint.prefix);
}
}, auth_method);
}
@ -245,7 +296,7 @@ void processURL(const String & url, const String & container_name, Endpoint & en
static bool containerExists(const ContainerClient & client)
{
ProfileEvents::increment(ProfileEvents::AzureGetProperties);
if (client.GetClickhouseOptions().IsClientForDisk)
if (client.IsClientForDisk())
ProfileEvents::increment(ProfileEvents::DiskAzureGetProperties);
try
@ -283,7 +334,8 @@ std::unique_ptr<ContainerClient> getContainerClient(const ConnectionParams & par
if (params.client_options.ClickhouseOptions.IsClientForDisk)
ProfileEvents::increment(ProfileEvents::DiskAzureCreateContainer);
return std::make_unique<ContainerClient>(service_client->CreateBlobContainer(params.endpoint.container_name).Value);
auto raw_client = service_client->CreateBlobContainer(params.endpoint.container_name).Value;
return std::make_unique<ContainerClient>(std::move(raw_client), params.endpoint.prefix);
}
catch (const Azure::Storage::StorageException & e)
{

View File

@ -14,6 +14,9 @@
#include <Poco/Util/AbstractConfiguration.h>
#include <Interpreters/Context_fwd.h>
#include <base/strong_typedef.h>
#include <filesystem>
namespace fs = std::filesystem;
namespace DB
{
@ -23,11 +26,6 @@ struct Settings;
namespace AzureBlobStorage
{
using ServiceClient = Azure::Storage::Blobs::BlobServiceClient;
using ContainerClient = Azure::Storage::Blobs::BlobContainerClient;
using BlobClient = Azure::Storage::Blobs::BlobClient;
using BlobClientOptions = Azure::Storage::Blobs::BlobClientOptions;
struct RequestSettings
{
RequestSettings() = default;
@ -65,7 +63,7 @@ struct Endpoint
String sas_auth;
std::optional<bool> container_already_exists;
String getEndpoint() const
String getContainerEndpoint() const
{
String url = storage_account_url;
if (url.ends_with('/'))
@ -77,16 +75,13 @@ struct Endpoint
if (!container_name.empty())
url += "/" + container_name;
if (!prefix.empty())
url += "/" + prefix;
if (!sas_auth.empty())
url += "?" + sas_auth;
return url;
}
String getEndpointWithoutContainer() const
String getServiceEndpoint() const
{
String url = storage_account_url;
@ -100,6 +95,35 @@ struct Endpoint
}
};
using BlobClient = Azure::Storage::Blobs::BlobClient;
using BlockBlobClient = Azure::Storage::Blobs::BlockBlobClient;
using RawContainerClient = Azure::Storage::Blobs::BlobContainerClient;
using Azure::Storage::Blobs::ListBlobsOptions;
using Azure::Storage::Blobs::ListBlobsPagedResponse;
using BlobContainerPropertiesRespones = Azure::Response<Azure::Storage::Blobs::Models::BlobContainerProperties>;
/// A wrapper for ContainerClient that correctly handles the prefix of blobs.
/// See AzureBlobStorageEndpoint and processAzureBlobStorageEndpoint for details.
class ContainerClientWrapper
{
public:
ContainerClientWrapper(RawContainerClient client_, String blob_prefix_);
bool IsClientForDisk() const;
BlobClient GetBlobClient(const String & blob_name) const;
BlockBlobClient GetBlockBlobClient(const String & blob_name) const;
BlobContainerPropertiesRespones GetProperties() const;
ListBlobsPagedResponse ListBlobs(const ListBlobsOptions & options) const;
private:
RawContainerClient client;
fs::path blob_prefix;
};
using ContainerClient = ContainerClientWrapper;
using ServiceClient = Azure::Storage::Blobs::BlobServiceClient;
using BlobClientOptions = Azure::Storage::Blobs::BlobClientOptions;
using ConnectionString = StrongTypedef<String, struct ConnectionStringTag>;
using AuthMethod = std::variant<

View File

@ -13,7 +13,6 @@
#include <Disks/ObjectStorages/AzureBlobStorage/AzureBlobStorageCommon.h>
#include <Disks/ObjectStorages/ObjectStorageIteratorAsync.h>
#include <Interpreters/Context.h>
#include <Common/logger_useful.h>
namespace CurrentMetrics
@ -52,7 +51,7 @@ class AzureIteratorAsync final : public IObjectStorageIteratorAsync
public:
AzureIteratorAsync(
const std::string & path_prefix,
std::shared_ptr<const Azure::Storage::Blobs::BlobContainerClient> client_,
std::shared_ptr<const AzureBlobStorage::ContainerClient> client_,
size_t max_list_size)
: IObjectStorageIteratorAsync(
CurrentMetrics::ObjectStorageAzureThreads,
@ -69,7 +68,7 @@ private:
bool getBatchAndCheckNext(RelativePathsWithMetadata & batch) override
{
ProfileEvents::increment(ProfileEvents::AzureListObjects);
if (client->GetClickhouseOptions().IsClientForDisk)
if (client->IsClientForDisk())
ProfileEvents::increment(ProfileEvents::DiskAzureListObjects);
batch.clear();
@ -97,7 +96,7 @@ private:
return true;
}
std::shared_ptr<const Azure::Storage::Blobs::BlobContainerClient> client;
std::shared_ptr<const AzureBlobStorage::ContainerClient> client;
Azure::Storage::Blobs::ListBlobsOptions options;
};
@ -130,7 +129,7 @@ bool AzureObjectStorage::exists(const StoredObject & object) const
auto client_ptr = client.get();
ProfileEvents::increment(ProfileEvents::AzureGetProperties);
if (client_ptr->GetClickhouseOptions().IsClientForDisk)
if (client_ptr->IsClientForDisk())
ProfileEvents::increment(ProfileEvents::DiskAzureGetProperties);
try
@ -159,9 +158,6 @@ void AzureObjectStorage::listObjects(const std::string & path, RelativePathsWith
{
auto client_ptr = client.get();
/// NOTE: list doesn't work if endpoint contains non-empty prefix for blobs.
/// See AzureBlobStorageEndpoint and processAzureBlobStorageEndpoint for details.
Azure::Storage::Blobs::ListBlobsOptions options;
options.Prefix = path;
if (max_keys)
@ -172,7 +168,7 @@ void AzureObjectStorage::listObjects(const std::string & path, RelativePathsWith
for (auto blob_list_response = client_ptr->ListBlobs(options); blob_list_response.HasPage(); blob_list_response.MoveToNextPage())
{
ProfileEvents::increment(ProfileEvents::AzureListObjects);
if (client_ptr->GetClickhouseOptions().IsClientForDisk)
if (client_ptr->IsClientForDisk())
ProfileEvents::increment(ProfileEvents::DiskAzureListObjects);
blob_list_response = client_ptr->ListBlobs(options);
@ -246,10 +242,13 @@ std::unique_ptr<WriteBufferFromFileBase> AzureObjectStorage::writeObject( /// NO
std::move(scheduler));
}
void AzureObjectStorage::removeObjectImpl(const StoredObject & object, const SharedAzureClientPtr & client_ptr, bool if_exists)
void AzureObjectStorage::removeObjectImpl(
const StoredObject & object,
const std::shared_ptr<const AzureBlobStorage::ContainerClient> & client_ptr,
bool if_exists)
{
ProfileEvents::increment(ProfileEvents::AzureDeleteObjects);
if (client_ptr->GetClickhouseOptions().IsClientForDisk)
if (client_ptr->IsClientForDisk())
ProfileEvents::increment(ProfileEvents::DiskAzureDeleteObjects);
const auto & path = object.remote_path;
@ -257,7 +256,7 @@ void AzureObjectStorage::removeObjectImpl(const StoredObject & object, const Sha
try
{
auto delete_info = client_ptr->DeleteBlob(path);
auto delete_info = client_ptr->GetBlobClient(path).Delete();
if (!if_exists && !delete_info.Value.Deleted)
throw Exception(
ErrorCodes::AZURE_BLOB_STORAGE_ERROR, "Failed to delete file (path: {}) in AzureBlob Storage, reason: {}",
@ -268,7 +267,7 @@ void AzureObjectStorage::removeObjectImpl(const StoredObject & object, const Sha
if (!if_exists)
throw;
/// If object doesn't exist...
/// If object doesn't exist.
if (e.StatusCode == Azure::Core::Http::HttpStatusCode::NotFound)
return;
@ -298,7 +297,7 @@ ObjectMetadata AzureObjectStorage::getObjectMetadata(const std::string & path) c
auto properties = blob_client.GetProperties().Value;
ProfileEvents::increment(ProfileEvents::AzureGetProperties);
if (client_ptr->GetClickhouseOptions().IsClientForDisk)
if (client_ptr->IsClientForDisk())
ProfileEvents::increment(ProfileEvents::DiskAzureGetProperties);
ObjectMetadata result;
@ -332,7 +331,7 @@ void AzureObjectStorage::copyObject( /// NOLINT
}
ProfileEvents::increment(ProfileEvents::AzureCopyObject);
if (client_ptr->GetClickhouseOptions().IsClientForDisk)
if (client_ptr->IsClientForDisk())
ProfileEvents::increment(ProfileEvents::DiskAzureCopyObject);
dest_blob_client.CopyFromUri(source_blob_client.GetUrl(), copy_options);
@ -350,7 +349,7 @@ void AzureObjectStorage::applyNewSettings(
if (!options.allow_client_change)
return;
bool is_client_for_disk = client.get()->GetClickhouseOptions().IsClientForDisk;
bool is_client_for_disk = client.get()->IsClientForDisk();
AzureBlobStorage::ConnectionParams params
{
@ -371,7 +370,7 @@ std::unique_ptr<IObjectStorage> AzureObjectStorage::cloneObjectStorage(
ContextPtr context)
{
auto new_settings = AzureBlobStorage::getRequestSettings(config, config_prefix, context);
bool is_client_for_disk = client.get()->GetClickhouseOptions().IsClientForDisk;
bool is_client_for_disk = client.get()->IsClientForDisk();
AzureBlobStorage::ConnectionParams params
{
@ -381,7 +380,7 @@ std::unique_ptr<IObjectStorage> AzureObjectStorage::cloneObjectStorage(
};
auto new_client = AzureBlobStorage::getContainerClient(params, /*readonly=*/ true);
return std::make_unique<AzureObjectStorage>(name, std::move(new_client), std::move(new_settings), new_namespace, params.endpoint.getEndpointWithoutContainer());
return std::make_unique<AzureObjectStorage>(name, std::move(new_client), std::move(new_settings), new_namespace, params.endpoint.getServiceEndpoint());
}
}

View File

@ -100,8 +100,10 @@ public:
bool supportParallelWrite() const override { return true; }
private:
using SharedAzureClientPtr = std::shared_ptr<const Azure::Storage::Blobs::BlobContainerClient>;
void removeObjectImpl(const StoredObject & object, const SharedAzureClientPtr & client_ptr, bool if_exists);
void removeObjectImpl(
const StoredObject & object,
const std::shared_ptr<const AzureBlobStorage::ContainerClient> & client_ptr,
bool if_exists);
const String name;
/// client used to access the files in the Blob Storage cloud

View File

@ -118,7 +118,7 @@ public:
const FileCacheSettings & getCacheSettings() const { return cache_settings; }
#if USE_AZURE_BLOB_STORAGE
std::shared_ptr<const Azure::Storage::Blobs::BlobContainerClient> getAzureBlobStorageClient() const override
std::shared_ptr<const AzureBlobStorage::ContainerClient> getAzureBlobStorageClient() const override
{
return object_storage->getAzureBlobStorageClient();
}

View File

@ -28,8 +28,7 @@
#include "config.h"
#if USE_AZURE_BLOB_STORAGE
#include <Common/MultiVersion.h>
#include <azure/storage/blobs.hpp>
#include <Disks/ObjectStorages/AzureBlobStorage/AzureBlobStorageCommon.h>
#endif
#if USE_AWS_S3
@ -256,7 +255,7 @@ public:
virtual void setKeysGenerator(ObjectStorageKeysGeneratorPtr) { }
#if USE_AZURE_BLOB_STORAGE
virtual std::shared_ptr<const Azure::Storage::Blobs::BlobContainerClient> getAzureBlobStorageClient() const
virtual std::shared_ptr<const AzureBlobStorage::ContainerClient> getAzureBlobStorageClient() const
{
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "This function is only implemented for AzureBlobStorage");
}

View File

@ -12,12 +12,6 @@
#include <Common/filesystemHelpers.h>
#include <filesystem>
#include <memory>
#include <optional>
#include <tuple>
#include <unordered_set>
#include <Poco/Timestamp.h>
namespace DB
{

View File

@ -307,7 +307,7 @@ void registerAzureObjectStorage(ObjectStorageFactory & factory)
ObjectStorageType::Azure, config, config_prefix, name,
AzureBlobStorage::getContainerClient(params, /*readonly=*/ false), std::move(azure_settings),
params.endpoint.prefix.empty() ? params.endpoint.container_name : params.endpoint.container_name + "/" + params.endpoint.prefix,
params.endpoint.getEndpointWithoutContainer());
params.endpoint.getServiceEndpoint());
};
factory.registerObjectStorageType("azure_blob_storage", creator);

View File

@ -251,6 +251,7 @@ FormatSettings getFormatSettings(const ContextPtr & context, const Settings & se
format_settings.values.deduce_templates_of_expressions = settings[Setting::input_format_values_deduce_templates_of_expressions];
format_settings.values.interpret_expressions = settings[Setting::input_format_values_interpret_expressions];
format_settings.values.escape_quote_with_quote = settings[Setting::output_format_values_escape_quote_with_quote];
format_settings.composed_data_type_output_format_mode = settings[Setting::composed_data_type_output_format_mode];
format_settings.with_names_use_header = settings[Setting::input_format_with_names_use_header];
format_settings.with_types_use_header = settings[Setting::input_format_with_types_use_header];
format_settings.write_statistics = settings[Setting::output_format_write_statistics];

View File

@ -38,6 +38,7 @@ struct FormatSettings
bool try_infer_variant = false;
bool seekable_read = true;
String composed_data_type_output_format_mode = "default";
UInt64 max_rows_to_read_for_schema_inference = 25000;
UInt64 max_bytes_to_read_for_schema_inference = 32 * 1024 * 1024;

View File

@ -3,6 +3,7 @@
#include <Columns/ColumnString.h>
#include <Columns/ColumnVector.h>
#include <Columns/ColumnsNumber.h>
#include <Core/UUID.h>
#include <DataTypes/DataTypeString.h>
#include <Functions/FunctionFactory.h>
#include <Functions/IFunction.h>

View File

@ -42,7 +42,7 @@ namespace
public:
UploadHelper(
const CreateReadBuffer & create_read_buffer_,
std::shared_ptr<const Azure::Storage::Blobs::BlobContainerClient> client_,
std::shared_ptr<const AzureBlobStorage::ContainerClient> client_,
size_t offset_,
size_t total_size_,
const String & dest_container_for_logging_,
@ -67,7 +67,7 @@ namespace
protected:
std::function<std::unique_ptr<SeekableReadBuffer>()> create_read_buffer;
std::shared_ptr<const Azure::Storage::Blobs::BlobContainerClient> client;
std::shared_ptr<const AzureBlobStorage::ContainerClient> client;
size_t offset;
size_t total_size;
const String & dest_container_for_logging;
@ -159,7 +159,7 @@ namespace
{
auto block_blob_client = client->GetBlockBlobClient(dest_blob);
ProfileEvents::increment(ProfileEvents::AzureCommitBlockList);
if (client->GetClickhouseOptions().IsClientForDisk)
if (client->IsClientForDisk())
ProfileEvents::increment(ProfileEvents::DiskAzureCommitBlockList);
block_blob_client.CommitBlockList(block_ids);
@ -271,7 +271,7 @@ namespace
void processUploadPartRequest(UploadPartTask & task)
{
ProfileEvents::increment(ProfileEvents::AzureStageBlock);
if (client->GetClickhouseOptions().IsClientForDisk)
if (client->IsClientForDisk())
ProfileEvents::increment(ProfileEvents::DiskAzureStageBlock);
auto block_blob_client = client->GetBlockBlobClient(dest_blob);
@ -322,7 +322,7 @@ void copyDataToAzureBlobStorageFile(
const std::function<std::unique_ptr<SeekableReadBuffer>()> & create_read_buffer,
size_t offset,
size_t size,
std::shared_ptr<const Azure::Storage::Blobs::BlobContainerClient> dest_client,
std::shared_ptr<const AzureBlobStorage::ContainerClient> dest_client,
const String & dest_container_for_logging,
const String & dest_blob,
std::shared_ptr<const AzureBlobStorage::RequestSettings> settings,
@ -335,8 +335,8 @@ void copyDataToAzureBlobStorageFile(
void copyAzureBlobStorageFile(
std::shared_ptr<const Azure::Storage::Blobs::BlobContainerClient> src_client,
std::shared_ptr<const Azure::Storage::Blobs::BlobContainerClient> dest_client,
std::shared_ptr<const AzureBlobStorage::ContainerClient> src_client,
std::shared_ptr<const AzureBlobStorage::ContainerClient> dest_client,
const String & src_container_for_logging,
const String & src_blob,
size_t offset,
@ -353,7 +353,7 @@ void copyAzureBlobStorageFile(
{
LOG_TRACE(log, "Copying Blob: {} from Container: {} using native copy", src_container_for_logging, src_blob);
ProfileEvents::increment(ProfileEvents::AzureCopyObject);
if (dest_client->GetClickhouseOptions().IsClientForDisk)
if (dest_client->IsClientForDisk())
ProfileEvents::increment(ProfileEvents::DiskAzureCopyObject);
auto block_blob_client_src = src_client->GetBlockBlobClient(src_blob);

View File

@ -20,8 +20,8 @@ using CreateReadBuffer = std::function<std::unique_ptr<SeekableReadBuffer>()>;
/// Copies a file from AzureBlobStorage to AzureBlobStorage.
/// The parameters `src_offset` and `src_size` specify a part in the source to copy.
void copyAzureBlobStorageFile(
std::shared_ptr<const Azure::Storage::Blobs::BlobContainerClient> src_client,
std::shared_ptr<const Azure::Storage::Blobs::BlobContainerClient> dest_client,
std::shared_ptr<const AzureBlobStorage::ContainerClient> src_client,
std::shared_ptr<const AzureBlobStorage::ContainerClient> dest_client,
const String & src_container_for_logging,
const String & src_blob,
size_t src_offset,
@ -42,7 +42,7 @@ void copyDataToAzureBlobStorageFile(
const std::function<std::unique_ptr<SeekableReadBuffer>()> & create_read_buffer,
size_t offset,
size_t size,
std::shared_ptr<const Azure::Storage::Blobs::BlobContainerClient> client,
std::shared_ptr<const AzureBlobStorage::ContainerClient> client,
const String & dest_container_for_logging,
const String & dest_blob,
std::shared_ptr<const AzureBlobStorage::RequestSettings> settings,

View File

@ -1,13 +1,22 @@
#include <IO/WriteHelpers.h>
#include <cinttypes>
#include <utility>
#include <Common/formatIPv6.h>
#include <base/DecomposedFloat.h>
#include <base/hex.h>
#include <Common/formatIPv6.h>
#pragma clang diagnostic push
#pragma clang diagnostic ignored "-Wunused-parameter"
#pragma clang diagnostic ignored "-Wsign-compare"
#include <dragonbox/dragonbox_to_chars.h>
#pragma clang diagnostic pop
namespace DB
{
namespace ErrorCodes
{
extern const int CANNOT_PRINT_FLOAT_OR_DOUBLE_NUMBER;
}
template <typename IteratorSrc, typename IteratorDst>
void formatHex(IteratorSrc src, IteratorDst dst, size_t num_bytes)
{
@ -127,4 +136,38 @@ String fourSpaceIndent(size_t indent)
{
return std::string(indent * 4, ' ');
}
template <typename T>
requires is_floating_point<T>
size_t writeFloatTextFastPath(T x, char * buffer)
{
Int64 result = 0;
if constexpr (std::is_same_v<T, Float64>)
{
/// The library dragonbox has low performance on integers.
/// This workaround improves performance 6..10 times.
if (DecomposedFloat64(x).isIntegerInRepresentableRange())
result = itoa(Int64(x), buffer) - buffer;
else
result = jkj::dragonbox::to_chars_n(x, buffer) - buffer;
}
else if constexpr (std::is_same_v<T, Float32> || std::is_same_v<T, BFloat16>)
{
Float32 f32 = Float32(x);
if (DecomposedFloat32(f32).isIntegerInRepresentableRange())
result = itoa(Int32(f32), buffer) - buffer;
else
result = jkj::dragonbox::to_chars_n(f32, buffer) - buffer;
}
if (result <= 0)
throw Exception(ErrorCodes::CANNOT_PRINT_FLOAT_OR_DOUBLE_NUMBER, "Cannot print floating point number");
return result;
}
template size_t writeFloatTextFastPath(Float64 x, char * buffer);
template size_t writeFloatTextFastPath(Float32 x, char * buffer);
template size_t writeFloatTextFastPath(BFloat16 x, char * buffer);
}

View File

@ -4,8 +4,6 @@
#include <cstdio>
#include <limits>
#include <algorithm>
#include <iterator>
#include <concepts>
#include <bit>
#include <pcg-random/pcg_random.hpp>
@ -18,19 +16,14 @@
#include <Common/transformEndianness.h>
#include <base/find_symbols.h>
#include <base/StringRef.h>
#include <base/DecomposedFloat.h>
#include <Core/DecimalFunctions.h>
#include <Core/Types.h>
#include <Core/UUID.h>
#include <base/IPv4andIPv6.h>
#include <Common/Exception.h>
#include <Common/StringUtils.h>
#include <Common/NaNUtils.h>
#include <Common/typeid_cast.h>
#include <IO/CompressionMethod.h>
#include <IO/WriteBuffer.h>
#include <IO/WriteIntText.h>
#include <IO/VarInt.h>
@ -38,24 +31,11 @@
#include <IO/WriteBufferFromString.h>
#include <IO/WriteBufferFromFileDescriptor.h>
#pragma clang diagnostic push
#pragma clang diagnostic ignored "-Wunused-parameter"
#pragma clang diagnostic ignored "-Wsign-compare"
#include <dragonbox/dragonbox_to_chars.h>
#pragma clang diagnostic pop
#include <Formats/FormatSettings.h>
namespace DB
{
namespace ErrorCodes
{
extern const int CANNOT_PRINT_FLOAT_OR_DOUBLE_NUMBER;
}
/// Helper functions for formatted and binary output.
inline void writeChar(char x, WriteBuffer & buf)
@ -151,41 +131,11 @@ inline void writeBoolText(bool x, WriteBuffer & buf)
template <typename T>
requires is_floating_point<T>
inline size_t writeFloatTextFastPath(T x, char * buffer)
{
Int64 result = 0;
size_t writeFloatTextFastPath(T x, char * buffer);
if constexpr (std::is_same_v<T, Float64>)
{
/// The library Ryu has low performance on integers.
/// This workaround improves performance 6..10 times.
if (DecomposedFloat64(x).isIntegerInRepresentableRange())
result = itoa(Int64(x), buffer) - buffer;
else
result = jkj::dragonbox::to_chars_n(x, buffer) - buffer;
}
else if constexpr (std::is_same_v<T, Float32>)
{
if (DecomposedFloat32(x).isIntegerInRepresentableRange())
result = itoa(Int32(x), buffer) - buffer;
else
result = jkj::dragonbox::to_chars_n(x, buffer) - buffer;
}
else if constexpr (std::is_same_v<T, BFloat16>)
{
Float32 f32 = Float32(x);
if (DecomposedFloat32(f32).isIntegerInRepresentableRange())
result = itoa(Int32(f32), buffer) - buffer;
else
result = jkj::dragonbox::to_chars_n(f32, buffer) - buffer;
}
if (result <= 0)
throw Exception(ErrorCodes::CANNOT_PRINT_FLOAT_OR_DOUBLE_NUMBER, "Cannot print floating point number");
return result;
}
extern template size_t writeFloatTextFastPath(Float64 x, char * buffer);
extern template size_t writeFloatTextFastPath(Float32 x, char * buffer);
extern template size_t writeFloatTextFastPath(BFloat16 x, char * buffer);
template <typename T>
requires is_floating_point<T>

View File

@ -398,8 +398,8 @@ ASTPtr parseAdditionalFilterConditionForTable(
for (const auto & additional_filter : additional_table_filters)
{
const auto & tuple = additional_filter.safeGet<const Tuple &>();
auto & table = tuple.at(0).safeGet<String>();
auto & filter = tuple.at(1).safeGet<String>();
const auto & table = tuple.at(0).safeGet<String>();
const auto & filter = tuple.at(1).safeGet<String>();
if (table == target.alias ||
(table == target.table && context.getCurrentDatabase() == target.database) ||

View File

@ -440,7 +440,7 @@ ColumnPtr Set::execute(const ColumnsWithTypeAndName & columns, bool negative) co
}
// If the original column is DateTime64, check for sub-second precision
if (isDateTime64(column_to_cast.column->getDataType()))
if (isDateTime64(column_to_cast.column->getDataType()) && !isDateTime64(removeNullable(result)->getDataType()))
{
processDateTime64Column(column_to_cast, result, null_map_holder, null_map);
}

View File

@ -1540,7 +1540,7 @@ std::pair<ASTPtr, BlockIO> executeQuery(
? getIdentifierName(ast_query_with_output->format)
: context->getDefaultFormat();
if (format_name == "Null")
if (boost::iequals(format_name, "Null"))
res.null_format = true;
}

View File

@ -142,7 +142,7 @@ size_t TokenInfo::getTotalSize() const
return size + parts.size() - 1;
}
#ifdef ABORT_ON_LOGICAL_ERROR
#ifdef DEBUG_OR_SANITIZER_BUILD
void CheckTokenTransform::transform(Chunk & chunk)
{
auto token_info = chunk.getChunkInfos().get<TokenInfo>();

View File

@ -93,7 +93,7 @@ namespace DeduplicationToken
};
#ifdef ABORT_ON_LOGICAL_ERROR
#ifdef DEBUG_OR_SANITIZER_BUILD
/// use that class only with debug builds in CI for introspection
class CheckTokenTransform : public ISimpleTransform
{

View File

@ -381,7 +381,7 @@ std::optional<Chain> generateViewChain(
table_prefers_large_blocks ? settings[Setting::min_insert_block_size_bytes] : 0ULL));
}
#ifdef ABORT_ON_LOGICAL_ERROR
#ifdef DEBUG_OR_SANITIZER_BUILD
out.addSource(std::make_shared<DeduplicationToken::CheckTokenTransform>("Before squashing", out.getInputHeader()));
#endif
@ -427,7 +427,7 @@ std::optional<Chain> generateViewChain(
if (type == QueryViewsLogElement::ViewType::MATERIALIZED)
{
#ifdef ABORT_ON_LOGICAL_ERROR
#ifdef DEBUG_OR_SANITIZER_BUILD
out.addSource(std::make_shared<DeduplicationToken::CheckTokenTransform>("Right after Inner query", out.getInputHeader()));
#endif
@ -450,7 +450,7 @@ std::optional<Chain> generateViewChain(
}
#ifdef ABORT_ON_LOGICAL_ERROR
#ifdef DEBUG_OR_SANITIZER_BUILD
out.addSource(std::make_shared<DeduplicationToken::CheckTokenTransform>("Right before Inner query", out.getInputHeader()));
#endif
}

View File

@ -364,7 +364,7 @@ void RefreshTask::refreshTask()
if (coordination.root_znode.last_attempt_replica == coordination.replica_name)
{
LOG_ERROR(log, "Znode {} indicates that this replica is running a refresh, but it isn't. Likely a bug.", coordination.path + "/running");
#ifdef ABORT_ON_LOGICAL_ERROR
#ifdef DEBUG_OR_SANITIZER_BUILD
abortOnFailedAssertion("Unexpected refresh lock in keeper");
#else
coordination.running_znode_exists = false;

View File

@ -88,7 +88,8 @@ void FutureMergedMutatedPart::assign(MergeTreeData::DataPartsVector parts_, Merg
void FutureMergedMutatedPart::updatePath(const MergeTreeData & storage, const IReservation * reservation)
{
path = storage.getFullPathOnDisk(reservation->getDisk()) + name + "/";
path = fs::path(storage.getFullPathOnDisk(reservation->getDisk())) / name;
path += "/";
}
}

View File

@ -387,7 +387,7 @@ IMergeTreeDataPart::IndexPtr IMergeTreeDataPart::getIndex() const
IMergeTreeDataPart::IndexPtr IMergeTreeDataPart::loadIndexToCache(PrimaryIndexCache & index_cache) const
{
auto key = PrimaryIndexCache::hash(getDataPartStorage().getFullPath());
auto key = PrimaryIndexCache::hash(getRelativePathOfActivePart());
auto callback = [this] { return loadIndex(); };
return index_cache.getOrSet(key, callback);
}
@ -398,7 +398,7 @@ void IMergeTreeDataPart::moveIndexToCache(PrimaryIndexCache & index_cache)
if (!index)
return;
auto key = PrimaryIndexCache::hash(getDataPartStorage().getFullPath());
auto key = PrimaryIndexCache::hash(getRelativePathOfActivePart());
index_cache.set(key, std::const_pointer_cast<Index>(index));
index.reset();
@ -406,6 +406,15 @@ void IMergeTreeDataPart::moveIndexToCache(PrimaryIndexCache & index_cache)
projection->moveIndexToCache(index_cache);
}
void IMergeTreeDataPart::removeIndexFromCache(PrimaryIndexCache * index_cache) const
{
if (!index_cache)
return;
auto key = PrimaryIndexCache::hash(getRelativePathOfActivePart());
index_cache->remove(key);
}
void IMergeTreeDataPart::setIndex(Columns index_columns)
{
std::scoped_lock lock(index_mutex);
@ -574,17 +583,49 @@ bool IMergeTreeDataPart::isMovingPart() const
return part_directory_path.parent_path().filename() == "moving";
}
void IMergeTreeDataPart::clearCaches()
{
if (cleared_data_in_caches.exchange(true) || is_duplicate)
return;
size_t uncompressed_bytes = getBytesUncompressedOnDisk();
/// Remove index and marks from cache if it was prewarmed to avoid threshing it with outdated data.
/// Do not remove in other cases to avoid extra contention on caches.
removeMarksFromCache(storage.getMarkCacheToPrewarm(uncompressed_bytes).get());
removeIndexFromCache(storage.getPrimaryIndexCacheToPrewarm(uncompressed_bytes).get());
}
bool IMergeTreeDataPart::mayStoreDataInCaches() const
{
size_t uncompressed_bytes = getBytesUncompressedOnDisk();
auto mark_cache = storage.getMarkCacheToPrewarm(uncompressed_bytes);
auto index_cache = storage.getPrimaryIndexCacheToPrewarm(uncompressed_bytes);
return (mark_cache || index_cache) && !cleared_data_in_caches;
}
void IMergeTreeDataPart::removeIfNeeded() noexcept
{
assert(assertHasValidVersionMetadata());
if (!is_temp && state != MergeTreeDataPartState::DeleteOnDestroy)
return;
std::string path;
try
{
path = getDataPartStorage().getRelativePath();
clearCaches();
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__, fmt::format("while removing part {} with path {}", name, path));
}
if (!is_temp && state != MergeTreeDataPartState::DeleteOnDestroy)
return;
try
{
if (!getDataPartStorage().exists()) // path
return;
@ -2113,6 +2154,11 @@ std::optional<String> IMergeTreeDataPart::getRelativePathForDetachedPart(const S
return {};
}
String IMergeTreeDataPart::getRelativePathOfActivePart() const
{
return fs::path(getDataPartStorage().getFullRootPath()) / name / "";
}
void IMergeTreeDataPart::renameToDetached(const String & prefix)
{
auto path_to_detach = getRelativePathForDetachedPart(prefix, /* broken */ false);

View File

@ -1,5 +1,6 @@
#pragma once
#include <atomic>
#include <unordered_map>
#include <IO/WriteSettings.h>
#include <Core/Block.h>
@ -185,6 +186,15 @@ public:
/// Loads marks and saves them into mark cache for specified columns.
virtual void loadMarksToCache(const Names & column_names, MarkCache * mark_cache) const = 0;
/// Removes marks from cache for all columns in part.
virtual void removeMarksFromCache(MarkCache * mark_cache) const = 0;
/// Removes data related to data part from mark and primary index caches.
void clearCaches();
/// Returns true if data related to data part may be stored in mark and primary index caches.
bool mayStoreDataInCaches() const;
String getMarksFileExtension() const { return index_granularity_info.mark_type.getFileExtension(); }
/// Generate the new name for this part according to `new_part_info` and min/max dates from the old name.
@ -376,6 +386,7 @@ public:
IndexPtr getIndex() const;
IndexPtr loadIndexToCache(PrimaryIndexCache & index_cache) const;
void moveIndexToCache(PrimaryIndexCache & index_cache);
void removeIndexFromCache(PrimaryIndexCache * index_cache) const;
void setIndex(Columns index_columns);
void unloadIndex();
@ -436,6 +447,10 @@ public:
std::optional<String> getRelativePathForPrefix(const String & prefix, bool detached = false, bool broken = false) const;
/// This method ignores current tmp prefix of part and returns
/// the name of part when it was or will be in Active state.
String getRelativePathOfActivePart() const;
bool isProjectionPart() const { return parent_part != nullptr; }
/// Check if the part is in the `/moving` directory
@ -757,6 +772,9 @@ private:
/// This ugly flag is needed for debug assertions only
mutable bool part_is_probably_removed_from_disk = false;
/// If it's true then data related to this part is cleared from mark and index caches.
mutable std::atomic_bool cleared_data_in_caches = false;
};
using MergeTreeDataPartPtr = std::shared_ptr<const IMergeTreeDataPart>;

View File

@ -120,10 +120,4 @@ NameSet IMergedBlockOutputStream::removeEmptyColumnsFromPart(
return remove_files;
}
PlainMarksByName IMergedBlockOutputStream::releaseCachedMarks()
{
if (!writer)
return {};
return writer->releaseCachedMarks();
}
}

View File

@ -35,7 +35,10 @@ public:
return writer->getIndexGranularity();
}
PlainMarksByName releaseCachedMarks();
PlainMarksByName releaseCachedMarks()
{
return writer ? writer->releaseCachedMarks() : PlainMarksByName{};
}
size_t getNumberOfOpenStreams() const
{
@ -43,7 +46,6 @@ public:
}
protected:
/// Remove all columns marked expired in data_part. Also, clears checksums
/// and columns array. Return set of removed files names.
NameSet removeEmptyColumnsFromPart(

View File

@ -445,12 +445,14 @@ bool MergeFromLogEntryTask::finalize(ReplicatedMergeMutateTaskBase::PartLogWrite
finish_callback = [storage_ptr = &storage]() { storage_ptr->merge_selecting_task->schedule(); };
ProfileEvents::increment(ProfileEvents::ReplicatedPartMerges);
if (auto mark_cache = storage.getMarkCacheToPrewarm())
size_t bytes_uncompressed = part->getBytesUncompressedOnDisk();
if (auto mark_cache = storage.getMarkCacheToPrewarm(bytes_uncompressed))
addMarksToCache(*part, cached_marks, mark_cache.get());
/// Move index to cache and reset it here because we need
/// a correct part name after rename for a key of cache entry.
if (auto index_cache = storage.getPrimaryIndexCacheToPrewarm())
if (auto index_cache = storage.getPrimaryIndexCacheToPrewarm(bytes_uncompressed))
part->moveIndexToCache(*index_cache);
write_part_log({});

View File

@ -152,13 +152,15 @@ void MergePlainMergeTreeTask::finish()
ThreadFuzzer::maybeInjectSleep();
ThreadFuzzer::maybeInjectMemoryLimitException();
if (auto mark_cache = storage.getMarkCacheToPrewarm())
size_t bytes_uncompressed = new_part->getBytesUncompressedOnDisk();
if (auto mark_cache = storage.getMarkCacheToPrewarm(bytes_uncompressed))
{
auto marks = merge_task->releaseCachedMarks();
addMarksToCache(*new_part, marks, mark_cache.get());
}
if (auto index_cache = storage.getPrimaryIndexCacheToPrewarm())
if (auto index_cache = storage.getPrimaryIndexCacheToPrewarm(bytes_uncompressed))
{
/// Move index to cache and reset it here because we need
/// a correct part name after rename for a key of cache entry.

View File

@ -578,6 +578,7 @@ bool MergeTask::ExecuteAndFinalizeHorizontalPart::prepare() const
ctx->compression_codec,
std::move(index_granularity_ptr),
global_ctx->txn ? global_ctx->txn->tid : Tx::PrehistoricTID,
global_ctx->merge_list_element_ptr->total_size_bytes_compressed,
/*reset_columns=*/ true,
ctx->blocks_are_granules_size,
global_ctx->context->getWriteSettings());
@ -1125,6 +1126,7 @@ void MergeTask::VerticalMergeStage::prepareVerticalMergeForOneColumn() const
getStatisticsForColumns(columns_list, global_ctx->metadata_snapshot),
ctx->compression_codec,
global_ctx->to->getIndexGranularity(),
global_ctx->merge_list_element_ptr->total_size_bytes_uncompressed,
&global_ctx->written_offset_columns);
ctx->column_elems_written = 0;

View File

@ -238,6 +238,7 @@ namespace MergeTreeSetting
extern const MergeTreeSettingsBool prewarm_mark_cache;
extern const MergeTreeSettingsBool primary_key_lazy_load;
extern const MergeTreeSettingsBool enforce_index_structure_match_on_partition_manipulation;
extern const MergeTreeSettingsUInt64 min_bytes_to_prewarm_caches;
}
namespace ServerSetting
@ -2366,19 +2367,31 @@ PrimaryIndexCachePtr MergeTreeData::getPrimaryIndexCache() const
return getContext()->getPrimaryIndexCache();
}
PrimaryIndexCachePtr MergeTreeData::getPrimaryIndexCacheToPrewarm() const
PrimaryIndexCachePtr MergeTreeData::getPrimaryIndexCacheToPrewarm(size_t part_uncompressed_bytes) const
{
if (!(*getSettings())[MergeTreeSetting::prewarm_primary_key_cache])
return nullptr;
/// Do not load data to caches for small parts because
/// they will be likely replaced by merge immediately.
size_t min_bytes_to_prewarm = (*getSettings())[MergeTreeSetting::min_bytes_to_prewarm_caches];
if (part_uncompressed_bytes && part_uncompressed_bytes < min_bytes_to_prewarm)
return nullptr;
return getPrimaryIndexCache();
}
MarkCachePtr MergeTreeData::getMarkCacheToPrewarm() const
MarkCachePtr MergeTreeData::getMarkCacheToPrewarm(size_t part_uncompressed_bytes) const
{
if (!(*getSettings())[MergeTreeSetting::prewarm_mark_cache])
return nullptr;
/// Do not load data to caches for small parts because
/// they will be likely replaced by merge immediately.
size_t min_bytes_to_prewarm = (*getSettings())[MergeTreeSetting::min_bytes_to_prewarm_caches];
if (part_uncompressed_bytes && part_uncompressed_bytes < min_bytes_to_prewarm)
return nullptr;
return getContext()->getMarkCache();
}
@ -9035,7 +9048,8 @@ std::pair<MergeTreeData::MutableDataPartPtr, scope_guard> MergeTreeData::createE
ColumnsStatistics{},
compression_codec,
std::make_shared<MergeTreeIndexGranularityAdaptive>(),
txn ? txn->tid : Tx::PrehistoricTID);
txn ? txn->tid : Tx::PrehistoricTID,
/*part_uncompressed_bytes=*/ 0);
bool sync_on_insert = (*settings)[MergeTreeSetting::fsync_after_insert];
@ -9108,14 +9122,14 @@ void MergeTreeData::unloadPrimaryKeys()
}
}
size_t MergeTreeData::unloadPrimaryKeysOfOutdatedParts()
size_t MergeTreeData::unloadPrimaryKeysAndClearCachesOfOutdatedParts()
{
/// If the method is already called from another thread, then we don't need to do anything.
std::unique_lock lock(unload_primary_key_mutex, std::defer_lock);
if (!lock.try_lock())
return 0;
DataPartsVector parts_to_unload_index;
DataPartsVector parts_to_clear;
{
auto parts_lock = lockParts();
@ -9126,18 +9140,22 @@ size_t MergeTreeData::unloadPrimaryKeysOfOutdatedParts()
/// Outdated part may be hold by SELECT query and still needs the index.
/// This check requires lock of index_mutex but if outdated part is unique then there is no
/// contention on it, so it's relatively cheap and it's ok to check under a global parts lock.
if (isSharedPtrUnique(part) && part->isIndexLoaded())
parts_to_unload_index.push_back(part);
if (isSharedPtrUnique(part) && (part->isIndexLoaded() || part->mayStoreDataInCaches()))
parts_to_clear.push_back(part);
}
}
for (const auto & part : parts_to_unload_index)
for (const auto & part : parts_to_clear)
{
const_cast<IMergeTreeDataPart &>(*part).unloadIndex();
auto & part_mut = const_cast<IMergeTreeDataPart &>(*part);
part_mut.unloadIndex();
part_mut.clearCaches();
LOG_TEST(log, "Unloaded primary key for outdated part {}", part->name);
}
return parts_to_unload_index.size();
return parts_to_clear.size();
}
void MergeTreeData::verifySortingKey(const KeyDescription & sorting_key)

View File

@ -511,9 +511,9 @@ public:
/// Returns a pointer to primary index cache if it is enabled.
PrimaryIndexCachePtr getPrimaryIndexCache() const;
/// Returns a pointer to primary index cache if it is enabled and required to be prewarmed.
PrimaryIndexCachePtr getPrimaryIndexCacheToPrewarm() const;
PrimaryIndexCachePtr getPrimaryIndexCacheToPrewarm(size_t part_uncompressed_bytes) const;
/// Returns a pointer to primary mark cache if it is required to be prewarmed.
MarkCachePtr getMarkCacheToPrewarm() const;
MarkCachePtr getMarkCacheToPrewarm(size_t part_uncompressed_bytes) const;
/// Prewarm mark cache and primary index cache for the most recent data parts.
void prewarmCaches(ThreadPool & pool, MarkCachePtr mark_cache, PrimaryIndexCachePtr index_cache);
@ -1166,7 +1166,7 @@ public:
/// Unloads primary keys of outdated parts that are not used by any query.
/// Returns the number of parts for which index was unloaded.
size_t unloadPrimaryKeysOfOutdatedParts();
size_t unloadPrimaryKeysAndClearCachesOfOutdatedParts();
protected:
friend class IMergeTreeDataPart;
@ -1335,7 +1335,7 @@ protected:
std::mutex grab_old_parts_mutex;
/// The same for clearOldTemporaryDirectories.
std::mutex clear_old_temporary_directories_mutex;
/// The same for unloadPrimaryKeysOfOutdatedParts.
/// The same for unloadPrimaryKeysAndClearCachesOfOutdatedParts.
std::mutex unload_primary_key_mutex;
void checkProperties(

View File

@ -175,6 +175,16 @@ void MergeTreeDataPartCompact::loadMarksToCache(const Names & column_names, Mark
loader.loadMarks();
}
void MergeTreeDataPartCompact::removeMarksFromCache(MarkCache * mark_cache) const
{
if (!mark_cache)
return;
auto mark_path = index_granularity_info.getMarksFilePath(DATA_FILE_NAME);
auto key = MarkCache::hash(fs::path(getRelativePathOfActivePart()) / mark_path);
mark_cache->remove(key);
}
bool MergeTreeDataPartCompact::hasColumnFiles(const NameAndTypePair & column) const
{
if (!getColumnPosition(column.getNameInStorage()))

View File

@ -55,6 +55,7 @@ public:
std::optional<String> getFileNameForColumn(const NameAndTypePair & /* column */) const override { return DATA_FILE_NAME; }
void loadMarksToCache(const Names & column_names, MarkCache * mark_cache) const override;
void removeMarksFromCache(MarkCache * mark_cache) const override;
~MergeTreeDataPartCompact() override;

View File

@ -244,6 +244,27 @@ void MergeTreeDataPartWide::loadMarksToCache(const Names & column_names, MarkCac
loader->loadMarks();
}
void MergeTreeDataPartWide::removeMarksFromCache(MarkCache * mark_cache) const
{
if (!mark_cache)
return;
const auto & serializations = getSerializations();
for (const auto & [column_name, serialization] : serializations)
{
serialization->enumerateStreams([&](const auto & subpath)
{
auto stream_name = getStreamNameForColumn(column_name, subpath, checksums);
if (!stream_name)
return;
auto mark_path = index_granularity_info.getMarksFilePath(*stream_name);
auto key = MarkCache::hash(fs::path(getRelativePathOfActivePart()) / mark_path);
mark_cache->remove(key);
});
}
}
bool MergeTreeDataPartWide::isStoredOnRemoteDisk() const
{
return getDataPartStorage().isStoredOnRemoteDisk();

View File

@ -52,6 +52,7 @@ public:
std::optional<time_t> getColumnModificationTime(const String & column_name) const override;
void loadMarksToCache(const Names & column_names, MarkCache * mark_cache) const override;
void removeMarksFromCache(MarkCache * mark_cache) const override;
protected:
static void loadIndexGranularityImpl(

View File

@ -225,12 +225,13 @@ void MergeTreeDataWriter::TemporaryPart::finalize()
projection->getDataPartStorage().precommitTransaction();
}
/// This method must be called after rename and commit of part
/// because a correct path is required for the keys of caches.
void MergeTreeDataWriter::TemporaryPart::prewarmCaches()
{
/// This method must be called after rename and commit of part
/// because a correct path is required for the keys of caches.
size_t bytes_uncompressed = part->getBytesUncompressedOnDisk();
if (auto mark_cache = part->storage.getMarkCacheToPrewarm())
if (auto mark_cache = part->storage.getMarkCacheToPrewarm(bytes_uncompressed))
{
for (const auto & stream : streams)
{
@ -239,7 +240,7 @@ void MergeTreeDataWriter::TemporaryPart::prewarmCaches()
}
}
if (auto index_cache = part->storage.getPrimaryIndexCacheToPrewarm())
if (auto index_cache = part->storage.getPrimaryIndexCacheToPrewarm(bytes_uncompressed))
{
/// Index was already set during writing. Now move it to cache.
part->moveIndexToCache(*index_cache);
@ -726,6 +727,7 @@ MergeTreeDataWriter::TemporaryPart MergeTreeDataWriter::writeTempPartImpl(
compression_codec,
std::move(index_granularity_ptr),
context->getCurrentTransaction() ? context->getCurrentTransaction()->tid : Tx::PrehistoricTID,
block.bytes(),
/*reset_columns=*/ false,
/*blocks_are_granules_size=*/ false,
context->getWriteSettings());
@ -880,6 +882,7 @@ MergeTreeDataWriter::TemporaryPart MergeTreeDataWriter::writeProjectionPartImpl(
compression_codec,
std::move(index_granularity_ptr),
Tx::PrehistoricTID,
block.bytes(),
/*reset_columns=*/ false,
/*blocks_are_granules_size=*/ false,
data.getContext()->getWriteSettings());

View File

@ -16,6 +16,7 @@ namespace ProfileEvents
{
extern const Event WaitMarksLoadMicroseconds;
extern const Event BackgroundLoadingMarksTasks;
extern const Event LoadedMarksFiles;
extern const Event LoadedMarksCount;
extern const Event LoadedMarksMemoryBytes;
}
@ -203,6 +204,7 @@ MarkCache::MappedPtr MergeTreeMarksLoader::loadMarksImpl()
auto res = std::make_shared<MarksInCompressedFile>(plain_marks);
ProfileEvents::increment(ProfileEvents::LoadedMarksFiles);
ProfileEvents::increment(ProfileEvents::LoadedMarksCount, marks_count * num_columns_in_mark);
ProfileEvents::increment(ProfileEvents::LoadedMarksMemoryBytes, res->approximateMemoryUsage());
@ -264,7 +266,7 @@ void addMarksToCache(const IMergeTreeDataPart & part, const PlainMarksByName & c
for (const auto & [stream_name, marks] : cached_marks)
{
auto mark_path = part.index_granularity_info.getMarksFilePath(stream_name);
auto key = MarkCache::hash(fs::path(part.getDataPartStorage().getFullPath()) / mark_path);
auto key = MarkCache::hash(fs::path(part.getRelativePathOfActivePart()) / mark_path);
mark_cache->set(key, std::make_shared<MarksInCompressedFile>(*marks));
}
}

View File

@ -83,6 +83,9 @@ struct MergeTreeSettings;
/// Adds computed marks for part to the marks cache.
void addMarksToCache(const IMergeTreeDataPart & part, const PlainMarksByName & cached_marks, MarkCache * mark_cache);
/// Removes cached marks for all columns from part.
void removeMarksFromCache(const IMergeTreeDataPart & part, MarkCache * mark_cache);
/// Returns the list of columns suitable for prewarming of mark cache according to settings.
Names getColumnsToPrewarmMarks(const MergeTreeSettings & settings, const NamesAndTypesList & columns_list);

View File

@ -244,6 +244,7 @@ namespace ErrorCodes
DECLARE(Bool, prewarm_primary_key_cache, false, "If true primary index cache will be prewarmed by saving marks to mark cache on inserts, merges, fetches and on startup of server", 0) \
DECLARE(Bool, prewarm_mark_cache, false, "If true mark cache will be prewarmed by saving marks to mark cache on inserts, merges, fetches and on startup of server", 0) \
DECLARE(String, columns_to_prewarm_mark_cache, "", "List of columns to prewarm mark cache for (if enabled). Empty means all columns", 0) \
DECLARE(UInt64, min_bytes_to_prewarm_caches, 0, "Minimal size (uncomressed bytes) to prewarm mark cache and primary index cache for new parts", 0) \
/** Projection settings. */ \
DECLARE(UInt64, max_projections, 25, "The maximum number of merge tree projections.", 0) \
DECLARE(LightweightMutationProjectionMode, lightweight_mutation_projection_mode, LightweightMutationProjectionMode::THROW, "When lightweight delete happens on a table with projection(s), the possible operations include throw the exception as projection exists, or drop projections of this table's relevant parts, or rebuild the projections.", 0) \

View File

@ -29,6 +29,7 @@ MergedBlockOutputStream::MergedBlockOutputStream(
CompressionCodecPtr default_codec_,
MergeTreeIndexGranularityPtr index_granularity_ptr,
TransactionID tid,
size_t part_uncompressed_bytes,
bool reset_columns_,
bool blocks_are_granules_size,
const WriteSettings & write_settings_)
@ -38,9 +39,9 @@ MergedBlockOutputStream::MergedBlockOutputStream(
, write_settings(write_settings_)
{
/// Save marks in memory if prewarm is enabled to avoid re-reading marks file.
bool save_marks_in_cache = data_part->storage.getMarkCacheToPrewarm() != nullptr;
bool save_marks_in_cache = data_part->storage.getMarkCacheToPrewarm(part_uncompressed_bytes) != nullptr;
/// Save primary index in memory if cache is disabled or is enabled with prewarm to avoid re-reading primary index file.
bool save_primary_index_in_memory = !data_part->storage.getPrimaryIndexCache() || data_part->storage.getPrimaryIndexCacheToPrewarm();
bool save_primary_index_in_memory = !data_part->storage.getPrimaryIndexCache() || data_part->storage.getPrimaryIndexCacheToPrewarm(part_uncompressed_bytes);
MergeTreeWriterSettings writer_settings(
data_part->storage.getContext()->getSettingsRef(),

View File

@ -24,6 +24,7 @@ public:
CompressionCodecPtr default_codec_,
MergeTreeIndexGranularityPtr index_granularity_ptr,
TransactionID tid,
size_t part_uncompressed_bytes,
bool reset_columns_ = false,
bool blocks_are_granules_size = false,
const WriteSettings & write_settings = {});

View File

@ -21,13 +21,14 @@ MergedColumnOnlyOutputStream::MergedColumnOnlyOutputStream(
const ColumnsStatistics & stats_to_recalc,
CompressionCodecPtr default_codec,
MergeTreeIndexGranularityPtr index_granularity_ptr,
size_t part_uncompressed_bytes,
WrittenOffsetColumns * offset_columns)
: IMergedBlockOutputStream(data_part->storage.getSettings(), data_part->getDataPartStoragePtr(), metadata_snapshot_, columns_list_, /*reset_columns=*/ true)
{
/// Save marks in memory if prewarm is enabled to avoid re-reading marks file.
bool save_marks_in_cache = data_part->storage.getMarkCacheToPrewarm() != nullptr;
bool save_marks_in_cache = data_part->storage.getMarkCacheToPrewarm(part_uncompressed_bytes) != nullptr;
/// Save primary index in memory if cache is disabled or is enabled with prewarm to avoid re-reading priamry index file.
bool save_primary_index_in_memory = !data_part->storage.getPrimaryIndexCache() || data_part->storage.getPrimaryIndexCacheToPrewarm();
bool save_primary_index_in_memory = !data_part->storage.getPrimaryIndexCache() || data_part->storage.getPrimaryIndexCacheToPrewarm(part_uncompressed_bytes);
/// Granularity is never recomputed while writing only columns.
MergeTreeWriterSettings writer_settings(

View File

@ -22,6 +22,7 @@ public:
const ColumnsStatistics & stats_to_recalc,
CompressionCodecPtr default_codec,
MergeTreeIndexGranularityPtr index_granularity_ptr,
size_t part_uncompressed_bytes,
WrittenOffsetColumns * offset_columns = nullptr);
void write(const Block & block) override;

View File

@ -1625,8 +1625,8 @@ private:
else
{
index_granularity_ptr = createMergeTreeIndexGranularity(
ctx->new_data_part->rows_count,
ctx->new_data_part->getBytesUncompressedOnDisk(),
ctx->source_part->rows_count,
ctx->source_part->getBytesUncompressedOnDisk(),
*ctx->data->getSettings(),
ctx->new_data_part->index_granularity_info,
/*blocks_are_granules=*/ false);
@ -1641,6 +1641,7 @@ private:
ctx->compression_codec,
std::move(index_granularity_ptr),
ctx->txn ? ctx->txn->tid : Tx::PrehistoricTID,
ctx->source_part->getBytesUncompressedOnDisk(),
/*reset_columns=*/ true,
/*blocks_are_granules_size=*/ false,
ctx->context->getWriteSettings());
@ -1876,7 +1877,8 @@ private:
std::vector<MergeTreeIndexPtr>(ctx->indices_to_recalc.begin(), ctx->indices_to_recalc.end()),
ColumnsStatistics(ctx->stats_to_recalc.begin(), ctx->stats_to_recalc.end()),
ctx->compression_codec,
ctx->source_part->index_granularity);
ctx->source_part->index_granularity,
ctx->source_part->getBytesUncompressedOnDisk());
ctx->mutating_pipeline = QueryPipelineBuilder::getPipeline(std::move(*builder));
ctx->mutating_pipeline.setProgressCallback(ctx->progress_callback);

View File

@ -193,7 +193,7 @@ Float32 ReplicatedMergeTreeCleanupThread::iterate()
cleaned_part_like += storage.clearEmptyParts();
}
cleaned_part_like += storage.unloadPrimaryKeysOfOutdatedParts();
cleaned_part_like += storage.unloadPrimaryKeysAndClearCachesOfOutdatedParts();
/// We need to measure the number of removed objects somehow (for better scheduling),
/// but just summing the number of removed async blocks, logs, and empty parts does not make any sense.

View File

@ -130,7 +130,6 @@ private:
std::unique_ptr<DelayedChunk> delayed_chunk;
void finishDelayedChunk(const ZooKeeperWithFaultInjectionPtr & zookeeper);
void prewarmCaches(const MergeTreeDataWriter::TemporaryPart & temp_part) const;
};
using ReplicatedMergeTreeSinkWithAsyncDeduplicate = ReplicatedMergeTreeSinkImpl<true>;

View File

@ -209,7 +209,7 @@ struct DeltaLakeMetadataImpl
if (!object)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Failed to parse metadata file");
#ifdef ABORT_ON_LOGICAL_ERROR
#ifdef DEBUG_OR_SANITIZER_BUILD
std::ostringstream oss; // STYLE_CHECK_ALLOW_STD_STRING_STREAM
object->stringify(oss);
LOG_TEST(log, "Metadata: {}", oss.str());

View File

@ -158,8 +158,8 @@ StorageMergeTree::StorageMergeTree(
prewarmCaches(
getActivePartsLoadingThreadPool().get(),
getMarkCacheToPrewarm(),
getPrimaryIndexCacheToPrewarm());
getMarkCacheToPrewarm(0),
getPrimaryIndexCacheToPrewarm(0));
}
@ -1522,7 +1522,7 @@ bool StorageMergeTree::scheduleDataProcessingJob(BackgroundJobsAssignee & assign
cleared_count += clearOldPartsFromFilesystem();
cleared_count += clearOldMutations();
cleared_count += clearEmptyParts();
cleared_count += unloadPrimaryKeysOfOutdatedParts();
cleared_count += unloadPrimaryKeysAndClearCachesOfOutdatedParts();
return cleared_count;
/// TODO maybe take into account number of cleared objects when calculating backoff
}, common_assignee_trigger, getStorageID()), /* need_trigger */ false);

View File

@ -515,8 +515,8 @@ StorageReplicatedMergeTree::StorageReplicatedMergeTree(
prewarmCaches(
getActivePartsLoadingThreadPool().get(),
getMarkCacheToPrewarm(),
getPrimaryIndexCacheToPrewarm());
getMarkCacheToPrewarm(0),
getPrimaryIndexCacheToPrewarm(0));
if (LoadingStrictnessLevel::ATTACH <= mode)
{
@ -5089,13 +5089,15 @@ bool StorageReplicatedMergeTree::fetchPart(
ProfileEvents::increment(ProfileEvents::ObsoleteReplicatedParts);
}
if (auto mark_cache = getMarkCacheToPrewarm())
size_t bytes_uncompressed = part->getBytesUncompressedOnDisk();
if (auto mark_cache = getMarkCacheToPrewarm(bytes_uncompressed))
{
auto column_names = getColumnsToPrewarmMarks(*getSettings(), part->getColumns());
part->loadMarksToCache(column_names, mark_cache.get());
}
if (auto index_cache = getPrimaryIndexCacheToPrewarm())
if (auto index_cache = getPrimaryIndexCacheToPrewarm(bytes_uncompressed))
{
part->loadIndexToCache(*index_cache);
}

View File

@ -1596,7 +1596,7 @@ void StorageWindowView::writeIntoWindowView(
return std::make_shared<DeduplicationToken::SetViewBlockNumberTransform>(stream_header);
});
#ifdef ABORT_ON_LOGICAL_ERROR
#ifdef DEBUG_OR_SANITIZER_BUILD
builder.addSimpleTransform([&](const Block & stream_header)
{
return std::make_shared<DeduplicationToken::CheckTokenTransform>("StorageWindowView: Afrer tmp table before squashing", stream_header);
@ -1643,7 +1643,7 @@ void StorageWindowView::writeIntoWindowView(
lateness_upper_bound);
});
#ifdef ABORT_ON_LOGICAL_ERROR
#ifdef DEBUG_OR_SANITIZER_BUILD
builder.addSimpleTransform([&](const Block & stream_header)
{
return std::make_shared<DeduplicationToken::CheckTokenTransform>("StorageWindowView: Afrer WatermarkTransform", stream_header);
@ -1668,7 +1668,7 @@ void StorageWindowView::writeIntoWindowView(
builder.addSimpleTransform([&](const Block & header_) { return std::make_shared<ExpressionTransform>(header_, convert_actions); });
}
#ifdef ABORT_ON_LOGICAL_ERROR
#ifdef DEBUG_OR_SANITIZER_BUILD
builder.addSimpleTransform([&](const Block & stream_header)
{
return std::make_shared<DeduplicationToken::CheckTokenTransform>("StorageWindowView: Before out", stream_header);

View File

@ -55,6 +55,8 @@ source /repo/tests/docker_scripts/utils.lib
# install test configs
/repo/tests/config/install.sh
azurite-blob --blobHost 0.0.0.0 --blobPort 10000 --silent --inMemoryPersistence &
/repo/tests/docker_scripts/setup_minio.sh stateless
config_logs_export_cluster /etc/clickhouse-server/config.d/system_logs_export.yaml

View File

@ -80,6 +80,7 @@ def _check_exception(exception, expected_tries=3):
for i, line in enumerate(lines[3 : 3 + expected_tries]):
expected_lines = (
"Code: 209. " + EXCEPTION_NETWORK + EXCEPTION_TIMEOUT,
"Code: 209. " + EXCEPTION_NETWORK + "Timeout: connect timed out",
EXCEPTION_CONNECT_TIMEOUT,
EXCEPTION_TIMEOUT,
)

View File

@ -191,8 +191,9 @@ def test_invalid_snapshot(started_cluster):
]
)
node.start_clickhouse(start_wait_sec=120, expected_to_fail=True)
assert node.contains_in_log("Failure to load from latest snapshot with index")
assert node.contains_in_log(
"Aborting because of failure to load from latest snapshot with index"
"Manual intervention is necessary for recovery. Problematic snapshot can be removed but it will lead to data loss"
)
node.stop_clickhouse()

View File

@ -0,0 +1,11 @@
10006
0 0 0
1 1 1
1 2 0
2 2 2
2 2 2
3 1 9
3 3 3
4 4 4
4 7 7
5 5 5

View File

@ -0,0 +1,53 @@
#!/usr/bin/env bash
# Tags: no-fasttest, no-shared-merge-tree
# Tag no-fasttest: requires Azure
# Tag no-shared-merge-tree: does not support replication
set -e
CUR_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CUR_DIR"/../shell_config.sh
container="cont-$(echo "${CLICKHOUSE_TEST_UNIQUE_NAME}" | tr _ -)"
${CLICKHOUSE_CLIENT} --query "drop table if exists test_azure_mt"
${CLICKHOUSE_CLIENT} -nm --query "
create table test_azure_mt (a Int32, b Int64, c Int64) engine = MergeTree() partition by intDiv(a, 1000) order by tuple(a, b)
settings disk = disk(
type = object_storage,
metadata_type = plain_rewritable,
object_storage_type = azure_blob_storage,
name = '${container}',
path='/var/lib/clickhouse/disks/${container}/tables',
container_name = '${container}',
endpoint = 'http://localhost:10000/devstoreaccount1/${container}/plain-tables',
account_name = 'devstoreaccount1',
account_key = 'Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==');
"
${CLICKHOUSE_CLIENT} -nm --query "
insert into test_azure_mt (*) values (1, 2, 0), (2, 2, 2), (3, 1, 9), (4, 7, 7), (5, 10, 2), (6, 12, 5);
insert into test_azure_mt (*) select number, number, number from numbers_mt(10000);
select count(*) from test_azure_mt;
select (*) from test_azure_mt order by tuple(a, b) limit 10;
"
${CLICKHOUSE_CLIENT} --query "optimize table test_azure_mt final"
${CLICKHOUSE_CLIENT} -m --query "
alter table test_azure_mt add projection test_azure_mt_projection (select * order by b)" 2>&1 | grep -Fq "SUPPORT_IS_DISABLED"
${CLICKHOUSE_CLIENT} -nm --query "
alter table test_azure_mt update c = 0 where a % 2 = 1;
alter table test_azure_mt add column d Int64 after c;
alter table test_azure_mt drop column c;
" 2>&1 | grep -Fq "SUPPORT_IS_DISABLED"
${CLICKHOUSE_CLIENT} -nm --query "
detach table test_azure_mt;
attach table test_azure_mt;
"
${CLICKHOUSE_CLIENT} --query "drop table test_azure_mt sync"

View File

@ -0,0 +1,16 @@
-- array format --
[\'1\']
[1, 2, abc, \'1\']
[1, 2, abc, \'1\']
[1, 2, abc, \'1\']
[1, 2, abc, \'1\']
-- map format --
{1343 -> fe, afe -> fefe}
{1343 -> fe, afe -> fefe}
{1343 -> fe, afe -> fefe}
{1343 -> fe, afe -> fefe}
-- tuple format --
(1, 3, abc)
(1, 3, abc)
(1, 3, abc)
(1, 3, abc)

View File

@ -0,0 +1,18 @@
SELECT '-- array format --';
SELECT CAST(array('\'1\'') , 'String') SETTINGS composed_data_type_output_format_mode = 'spark';
SELECT CAST([materialize('1'), '2', 'abc', '\'1\''], 'String') SETTINGS composed_data_type_output_format_mode = 'spark';
SELECT CAST([materialize('1'), materialize('2'), 'abc', '\'1\''], 'String') SETTINGS composed_data_type_output_format_mode = 'spark';
SELECT CAST([materialize('1'), materialize('2'), materialize('abc'), '\'1\''], 'String') SETTINGS composed_data_type_output_format_mode = 'spark';
SELECT CAST([materialize('1'), materialize('2'), materialize('abc'), materialize('\'1\'')], 'String') SETTINGS composed_data_type_output_format_mode = 'spark';
SELECT '-- map format --';
SELECT toString(map('1343', 'fe', 'afe', 'fefe')) SETTINGS composed_data_type_output_format_mode = 'spark';
SELECT toString(map(materialize('1343'), materialize('fe'), 'afe', 'fefe')) SETTINGS composed_data_type_output_format_mode = 'spark';
SELECT toString(map(materialize('1343'), materialize('fe'), materialize('afe'), 'fefe')) SETTINGS composed_data_type_output_format_mode = 'spark';
SELECT toString(map(materialize('1343'), materialize('fe'), materialize('afe'), materialize('fefe'))) SETTINGS composed_data_type_output_format_mode = 'spark';
SELECT '-- tuple format --';
SELECT toString(('1', '3', 'abc')) SETTINGS composed_data_type_output_format_mode = 'spark';
SELECT toString((materialize('1'), '3', 'abc')) SETTINGS composed_data_type_output_format_mode = 'spark';
SELECT toString((materialize('1'), materialize('3'), 'abc')) SETTINGS composed_data_type_output_format_mode = 'spark';
SELECT toString((materialize('1'), materialize('3'), materialize('abc'))) SETTINGS composed_data_type_output_format_mode = 'spark';

View File

@ -0,0 +1,17 @@
#!/usr/bin/env bash
set -e
CUR_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CUR_DIR"/../shell_config.sh
query_id="03276_null_format_matching_case_insensitive_$RANDOM$RANDOM"
$CLICKHOUSE_CLIENT --query_id "$query_id" -q "select * from numbers_mt(1e8) format null settings max_rows_to_read=0"
$CLICKHOUSE_CLIENT -q "
SYSTEM FLUSH LOGS;
-- SendBytes should be close to 0, previously for this query it was around 800MB
select ProfileEvents['NetworkSendBytes'] < 1e6 from system.query_log where current_database = currentDatabase() and event_date >= yesterday() and query_id = '$query_id' and type = 'QueryFinish';
"

View File

@ -0,0 +1,2 @@
42
1 1

View File

@ -0,0 +1,9 @@
SELECT 42 SETTINGS log_comment='03277_logging_elapsed_ns';
SYSTEM FLUSH LOGS;
SELECT
ProfileEvents['LogDebug'] + ProfileEvents['LogTrace'] > 0,
ProfileEvents['LoggerElapsedNanoseconds'] > 0
FROM system.query_log
WHERE current_database = currentDatabase() AND log_comment = '03277_logging_elapsed_ns' AND type = 'QueryFinish';

View File

@ -0,0 +1,12 @@
MarkCacheFiles 3
PrimaryIndexCacheFiles 1
550
MarkCacheFiles 5
PrimaryIndexCacheFiles 2
550
MarkCacheFiles 3
PrimaryIndexCacheFiles 1
MarkCacheFiles 0
PrimaryIndexCacheFiles 0
2 1
0 0

View File

@ -0,0 +1,76 @@
#!/usr/bin/env bash
# Tags: no-parallel, no-random-merge-tree-settings
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CURDIR"/../shell_config.sh
$CLICKHOUSE_CLIENT --query "
DROP TABLE IF EXISTS t_prewarm_cache_rmt_1;
CREATE TABLE t_prewarm_cache_rmt_1 (a UInt64, b UInt64, c UInt64)
ENGINE = ReplicatedMergeTree('/clickhouse/tables/{database}/03277_prewarms_caches/t_prewarm_cache', '1')
ORDER BY a
SETTINGS
index_granularity = 100,
min_bytes_for_wide_part = 0,
use_primary_key_cache = 1,
prewarm_primary_key_cache = 1,
prewarm_mark_cache = 1,
max_cleanup_delay_period = 1,
cleanup_delay_period = 1,
min_bytes_to_prewarm_caches = 30000;
SYSTEM DROP MARK CACHE;
SYSTEM DROP PRIMARY INDEX CACHE;
INSERT INTO t_prewarm_cache_rmt_1 SELECT number, rand(), rand() FROM numbers(100, 100);
INSERT INTO t_prewarm_cache_rmt_1 SELECT number, rand(), rand() FROM numbers(1000, 2000);
SYSTEM RELOAD ASYNCHRONOUS METRICS;
SELECT metric, value FROM system.asynchronous_metrics WHERE metric IN ('PrimaryIndexCacheFiles', 'MarkCacheFiles') ORDER BY metric;
SELECT count() FROM t_prewarm_cache_rmt_1 WHERE a % 2 = 0 AND a >= 100 AND a < 2000 AND NOT ignore(a, b);
SYSTEM RELOAD ASYNCHRONOUS METRICS;
SELECT metric, value FROM system.asynchronous_metrics WHERE metric IN ('PrimaryIndexCacheFiles', 'MarkCacheFiles') ORDER BY metric;
SYSTEM DROP MARK CACHE;
SYSTEM DROP PRIMARY INDEX CACHE;
OPTIMIZE TABLE t_prewarm_cache_rmt_1 FINAL;
SELECT count() FROM t_prewarm_cache_rmt_1 WHERE a % 2 = 0 AND a >= 100 AND a < 2000 AND NOT ignore(a, b);
SYSTEM RELOAD ASYNCHRONOUS METRICS;
SELECT metric, value FROM system.asynchronous_metrics WHERE metric IN ('PrimaryIndexCacheFiles', 'MarkCacheFiles') ORDER BY metric;
TRUNCATE TABLE t_prewarm_cache_rmt_1;
"
for _ in {1..100}; do
res=$($CLICKHOUSE_CLIENT -q "
SYSTEM RELOAD ASYNCHRONOUS METRICS;
SELECT value FROM system.asynchronous_metrics WHERE metric = 'PrimaryIndexCacheFiles';
")
if [[ $res -eq 0 ]]; then
break
fi
sleep 0.3
done
$CLICKHOUSE_CLIENT --query "
SYSTEM RELOAD ASYNCHRONOUS METRICS;
SELECT metric, value FROM system.asynchronous_metrics WHERE metric IN ('PrimaryIndexCacheFiles', 'MarkCacheFiles') ORDER BY metric;
SYSTEM FLUSH LOGS;
SELECT
ProfileEvents['LoadedMarksFiles'],
ProfileEvents['LoadedPrimaryIndexFiles']
FROM system.query_log
WHERE current_database = currentDatabase() AND type = 'QueryFinish' AND query LIKE 'SELECT count() FROM t_prewarm_cache_rmt_1%'
ORDER BY event_time_microseconds;
DROP TABLE IF EXISTS t_prewarm_cache_rmt_1;
"

View File

@ -0,0 +1,2 @@
1 2001-01-11 01:11:21.100 2001-01-11 01:11:21.100
1 2001-01-11 01:11:21.100 2001-01-11 01:11:21.100

View File

@ -0,0 +1,9 @@
CREATE TABLE datetime64_issue (id int, dt DateTime64(3), dtn Nullable(DateTime64(3))) ENGINE = MergeTree() ORDER BY id PRIMARY KEY id;
INSERT INTO datetime64_issue(id, dt, dtn) VALUES (1, toDateTime64('2001-01-11 01:11:21.100', 3), toDateTime64('2001-01-11 01:11:21.100', 3));
SELECT * FROM datetime64_issue WHERE dt in (toDateTime64('2001-01-11 01:11:21.100', 3));
SELECT * FROM datetime64_issue WHERE dtn in (toDateTime64('2001-01-11 01:11:21.100', 3));
DROP TABLE datetime64_issue;

View File

@ -69,6 +69,7 @@ EXTERN_TYPES_EXCLUDES=(
ProfileEvents::end
ProfileEvents::increment
ProfileEvents::incrementForLogMessage
ProfileEvents::incrementLoggerElapsedNanoseconds
ProfileEvents::getName
ProfileEvents::Timer
ProfileEvents::Type