Merge remote-tracking branch 'blessed/master' into exp_settings

This commit is contained in:
Raúl Marín 2024-07-03 18:55:48 +02:00
commit bfd91d1e72
32 changed files with 299 additions and 158 deletions

View File

@ -253,7 +253,7 @@ function run_tests()
try_run_with_retry 10 clickhouse-client -q "insert into system.zookeeper (name, path, value) values ('auxiliary_zookeeper2', '/test/chroot/', '')"
set +e
clickhouse-test --testname --shard --zookeeper --check-zookeeper-session --hung-check --print-time \
timeout -s TERM --preserve-status 120m clickhouse-test --testname --shard --zookeeper --check-zookeeper-session --hung-check --print-time \
--no-drop-if-fail --test-runs "$NUM_TRIES" "${ADDITIONAL_OPTIONS[@]}" 2>&1 \
| ts '%Y-%m-%d %H:%M:%S' \
| tee -a test_output/test_result.txt

View File

@ -152,7 +152,7 @@ SELECT * FROM test;
`MATERIALIZED expr`
Materialized expression. Values of such columns are always calculated, they cannot be specified in INSERT queries.
Materialized expression. Values of such columns are automatically calculated according to the specified materialized expression when rows are inserted. Values cannot be explicitly specified during `INSERT`s.
Also, default value columns of this type are not included in the result of `SELECT *`. This is to preserve the invariant that the result of a `SELECT *` can always be inserted back into the table using `INSERT`. This behavior can be disabled with setting `asterisk_include_materialized_columns`.

View File

@ -269,9 +269,9 @@ FROM s3(
## Virtual Columns {#virtual-columns}
- `_path` — Path to the file. Type: `LowCardinalty(String)`.
- `_file` — Name of the file. Type: `LowCardinalty(String)`.
- `_size` — Size of the file in bytes. Type: `Nullable(UInt64)`. If the file size is unknown, the value is `NULL`.
- `_path` — Path to the file. Type: `LowCardinalty(String)`. In case of archive, shows path in a format: "{path_to_archive}::{path_to_file_inside_archive}"
- `_file` — Name of the file. Type: `LowCardinalty(String)`. In case of archive shows name of the file inside the archive.
- `_size` — Size of the file in bytes. Type: `Nullable(UInt64)`. If the file size is unknown, the value is `NULL`. In case of archive shows uncompressed file size of the file inside the archive.
- `_time` — Last modified time of the file. Type: `Nullable(DateTime)`. If the time is unknown, the value is `NULL`.
## Storage Settings {#storage-settings}

View File

@ -80,8 +80,8 @@ These functions can be used only as a window function.
- `nth_value(x, offset)` - Return the first non-NULL value evaluated against the nth row (offset) in its ordered frame.
- `rank()` - Rank the current row within its partition with gaps.
- `dense_rank()` - Rank the current row within its partition without gaps.
- `lagInFrame(x)` - Return a value evaluated at the row that is at a specified physical offset row before the current row within the ordered frame.
- `leadInFrame(x)` - Return a value evaluated at the row that is offset rows after the current row within the ordered frame.
- `lagInFrame(x[, offset[, default]])` - Return a value evaluated at the row that is at a specified physical offset row before the current row within the ordered frame. The offset parameter, if not specified, defaults to 1, meaning it will fetch the value from the next row. If the calculated row exceeds the boundaries of the window frame, the specified default value is returned.
- `leadInFrame(x[, offset[, default]])` - Return a value evaluated at the row that is offset rows after the current row within the ordered frame. If offset is not provided, it defaults to 1. If the offset leads to a position outside the window frame, the specified default value is used.
## Examples

View File

@ -3,6 +3,7 @@
#include <IO/ReadBufferFromString.h>
#include <DataTypes/DataTypeNullable.h>
#include <DataTypes/DataTypeDateTime64.h>
#include <Columns/ColumnNullable.h>
#include <Common/assert_cast.h>
#include <IO/ReadHelpers.h>
@ -47,9 +48,17 @@ Chunk ODBCSource::generate()
for (int idx = 0; idx < result.columns(); ++idx)
{
const auto & sample = description.sample_block.getByPosition(idx);
if (!result.is_null(idx))
insertValue(*columns[idx], removeNullable(sample.type), description.types[idx].first, result, idx);
{
if (columns[idx]->isNullable())
{
ColumnNullable & column_nullable = assert_cast<ColumnNullable &>(*columns[idx]);
insertValue(column_nullable.getNestedColumn(), removeNullable(sample.type), description.types[idx].first, result, idx);
column_nullable.getNullMapData().emplace_back(0);
}
else
insertValue(*columns[idx], removeNullable(sample.type), description.types[idx].first, result, idx);
}
else
insertDefaultValue(*columns[idx], *sample.column);
}

View File

@ -1206,11 +1206,8 @@ void ClientBase::receiveResult(ASTPtr parsed_query, Int32 signals_before_stop, b
if (local_format_error)
std::rethrow_exception(local_format_error);
if (cancelled && is_interactive)
{
if (cancelled && is_interactive && !cancelled_printed.exchange(true))
output_stream << "Query was cancelled." << std::endl;
cancelled_printed = true;
}
}
@ -1326,7 +1323,7 @@ void ClientBase::onEndOfStream()
if (is_interactive)
{
if (cancelled && !cancelled_printed)
if (cancelled && !cancelled_printed.exchange(true))
output_stream << "Query was cancelled." << std::endl;
else if (!written_first_block)
output_stream << "Ok." << std::endl;

View File

@ -338,8 +338,8 @@ protected:
bool allow_repeated_settings = false;
bool allow_merge_tree_settings = false;
bool cancelled = false;
bool cancelled_printed = false;
std::atomic_bool cancelled = false;
std::atomic_bool cancelled_printed = false;
/// Unpacked descriptors and streams for the ease of use.
int in_fd = STDIN_FILENO;

View File

@ -1,8 +1,6 @@
#pragma once
#include <deque>
#include <type_traits>
#include <atomic>
#include <condition_variable>
#include <mutex>
#include <optional>
@ -200,22 +198,18 @@ public:
*/
bool finish()
{
bool was_finished_before = false;
{
std::lock_guard lock(queue_mutex);
if (is_finished)
return true;
was_finished_before = is_finished;
is_finished = true;
}
pop_condition.notify_all();
push_condition.notify_all();
return was_finished_before;
return false;
}
/// Returns if queue is finished

View File

@ -447,14 +447,18 @@ The server successfully detected this situation and will download merged part fr
M(QueryMemoryLimitExceeded, "Number of times when memory limit exceeded for query.") \
\
M(AzureGetObject, "Number of Azure API GetObject calls.") \
M(AzureUploadPart, "Number of Azure blob storage API UploadPart calls") \
M(AzureUpload, "Number of Azure blob storage API Upload calls") \
M(AzureStageBlock, "Number of Azure blob storage API StageBlock calls") \
M(AzureCommitBlockList, "Number of Azure blob storage API CommitBlockList calls") \
M(AzureCopyObject, "Number of Azure blob storage API CopyObject calls") \
M(AzureDeleteObjects, "Number of Azure blob storage API DeleteObject(s) calls.") \
M(AzureListObjects, "Number of Azure blob storage API ListObjects calls.") \
M(AzureGetProperties, "Number of Azure blob storage API GetProperties calls.") \
\
M(DiskAzureGetObject, "Number of Disk Azure API GetObject calls.") \
M(DiskAzureUploadPart, "Number of Disk Azure blob storage API UploadPart calls") \
M(DiskAzureUpload, "Number of Disk Azure blob storage API Upload calls") \
M(DiskAzureStageBlock, "Number of Disk Azure blob storage API StageBlock calls") \
M(DiskAzureCommitBlockList, "Number of Disk Azure blob storage API CommitBlockList calls") \
M(DiskAzureCopyObject, "Number of Disk Azure blob storage API CopyObject calls") \
M(DiskAzureListObjects, "Number of Disk Azure blob storage API ListObjects calls.") \
M(DiskAzureDeleteObjects, "Number of Azure blob storage API DeleteObject(s) calls.") \

View File

@ -150,12 +150,18 @@
M(S3PutObject) \
M(S3GetObject) \
\
M(AzureUploadPart) \
M(DiskAzureUploadPart) \
M(AzureUpload) \
M(DiskAzureUpload) \
M(AzureStageBlock) \
M(DiskAzureStageBlock) \
M(AzureCommitBlockList) \
M(DiskAzureCommitBlockList) \
M(AzureCopyObject) \
M(DiskAzureCopyObject) \
M(AzureDeleteObjects) \
M(DiskAzureDeleteObjects) \
M(AzureListObjects) \
M(DiskAzureListObjects) \
\
M(DiskS3DeleteObjects) \
M(DiskS3CopyObject) \

View File

@ -334,19 +334,13 @@ void KeeperDispatcher::snapshotThread()
{
setThreadName("KeeperSnpT");
const auto & shutdown_called = keeper_context->isShutdownCalled();
while (!shutdown_called)
CreateSnapshotTask task;
while (snapshots_queue.pop(task))
{
CreateSnapshotTask task;
if (!snapshots_queue.pop(task))
break;
try
{
auto snapshot_file_info = task.create_snapshot(std::move(task.snapshot), /*execute_only_cleanup=*/shutdown_called);
if (shutdown_called)
break;
if (!snapshot_file_info)
continue;

View File

@ -597,7 +597,7 @@ void KeeperStateMachine::create_snapshot(nuraft::snapshot & s, nuraft::async_res
snapshot_task.create_snapshot = [this, when_done](KeeperStorageSnapshotPtr && snapshot, bool execute_only_cleanup)
{
nuraft::ptr<std::exception> exception(nullptr);
bool ret = true;
bool ret = false;
if (!execute_only_cleanup)
{
try
@ -627,7 +627,8 @@ void KeeperStateMachine::create_snapshot(nuraft::snapshot & s, nuraft::async_res
else
{
auto snapshot_buf = snapshot_manager.serializeSnapshotToBuffer(*snapshot);
auto snapshot_info = snapshot_manager.serializeSnapshotBufferToDisk(*snapshot_buf, snapshot->snapshot_meta->get_last_log_idx());
auto snapshot_info = snapshot_manager.serializeSnapshotBufferToDisk(
*snapshot_buf, snapshot->snapshot_meta->get_last_log_idx());
latest_snapshot_info = std::move(snapshot_info);
latest_snapshot_buf = std::move(snapshot_buf);
}
@ -640,13 +641,14 @@ void KeeperStateMachine::create_snapshot(nuraft::snapshot & s, nuraft::async_res
latest_snapshot_info->path);
}
}
ret = true;
}
catch (...)
{
ProfileEvents::increment(ProfileEvents::KeeperSnapshotCreationsFailed);
LOG_TRACE(log, "Exception happened during snapshot");
tryLogCurrentException(log);
ret = false;
}
}
{

View File

@ -695,6 +695,7 @@ class IColumn;
M(Bool, database_replicated_always_detach_permanently, false, "Execute DETACH TABLE as DETACH TABLE PERMANENTLY if database engine is Replicated", 0) \
M(Bool, database_replicated_allow_only_replicated_engine, false, "Allow to create only Replicated tables in database with engine Replicated", 0) \
M(Bool, database_replicated_allow_replicated_engine_arguments, true, "Allow to create only Replicated tables in database with engine Replicated with explicit arguments", 0) \
M(Bool, database_replicated_allow_heavy_create, false, "Allow long-running DDL queries (CREATE AS SELECT and POPULATE) in Replicated database engine. Note that it can block DDL queue for a long time.", 0) \
M(Bool, cloud_mode, false, "Only available in ClickHouse Cloud", 0) \
M(UInt64, cloud_mode_engine, 1, "Only available in ClickHouse Cloud", 0) \
M(DistributedDDLOutputMode, distributed_ddl_output_mode, DistributedDDLOutputMode::THROW, "Format of distributed DDL query result, one of: 'none', 'throw', 'null_status_on_timeout', 'never_throw', 'none_only_active', 'throw_only_active', 'null_status_on_timeout_only_active'", 0) \

View File

@ -60,6 +60,7 @@ static std::initializer_list<std::pair<ClickHouseVersion, SettingsChangesHistory
{"24.7", {{"output_format_parquet_write_page_index", false, true, "Add a possibility to write page index into parquet files."},
{"input_format_json_ignore_key_case", false, false, "Ignore json key case while read json field from string."},
{"optimize_trivial_insert_select", true, false, "The optimization does not make sense in many cases."},
{"database_replicated_allow_heavy_create", true, false, "Long-running DDL queries (CREATE AS SELECT and POPULATE) for Replicated database engine was forbidden"},
}},
{"24.6", {{"materialize_skip_indexes_on_insert", true, true, "Added new setting to allow to disable materialization of skip indexes on insert"},
{"materialize_statistics_on_insert", true, true, "Added new setting to allow to disable materialization of statistics on insert"},

View File

@ -14,6 +14,15 @@ namespace ProfileEvents
{
extern const Event RemoteWriteThrottlerBytes;
extern const Event RemoteWriteThrottlerSleepMicroseconds;
extern const Event AzureUpload;
extern const Event AzureStageBlock;
extern const Event AzureCommitBlockList;
extern const Event DiskAzureUpload;
extern const Event DiskAzureStageBlock;
extern const Event DiskAzureCommitBlockList;
}
namespace DB
@ -134,6 +143,10 @@ void WriteBufferFromAzureBlobStorage::preFinalize()
/// then we use single part upload instead of multi part upload
if (block_ids.empty() && detached_part_data.size() == 1 && detached_part_data.front().data_size <= max_single_part_upload_size)
{
ProfileEvents::increment(ProfileEvents::AzureUpload);
if (blob_container_client->GetClickhouseOptions().IsClientForDisk)
ProfileEvents::increment(ProfileEvents::DiskAzureUpload);
auto part_data = std::move(detached_part_data.front());
auto block_blob_client = blob_container_client->GetBlockBlobClient(blob_path);
Azure::Core::IO::MemoryBodyStream memory_stream(reinterpret_cast<const uint8_t *>(part_data.memory.data()), part_data.data_size);
@ -164,6 +177,10 @@ void WriteBufferFromAzureBlobStorage::finalizeImpl()
if (!block_ids.empty())
{
auto block_blob_client = blob_container_client->GetBlockBlobClient(blob_path);
ProfileEvents::increment(ProfileEvents::AzureCommitBlockList);
if (blob_container_client->GetClickhouseOptions().IsClientForDisk)
ProfileEvents::increment(ProfileEvents::DiskAzureCommitBlockList);
execWithRetry([&](){ block_blob_client.CommitBlockList(block_ids); }, max_unexpected_write_error_retries);
LOG_TRACE(log, "Committed {} blocks for blob `{}`", block_ids.size(), blob_path);
}
@ -269,6 +286,10 @@ void WriteBufferFromAzureBlobStorage::writePart(WriteBufferFromAzureBlobStorage:
auto & data_block_id = std::get<0>(*worker_data);
auto block_blob_client = blob_container_client->GetBlockBlobClient(blob_path);
ProfileEvents::increment(ProfileEvents::AzureStageBlock);
if (blob_container_client->GetClickhouseOptions().IsClientForDisk)
ProfileEvents::increment(ProfileEvents::DiskAzureStageBlock);
Azure::Core::IO::MemoryBodyStream memory_stream(reinterpret_cast<const uint8_t *>(std::get<1>(*worker_data).memory.data()), data_size);
execWithRetry([&](){ block_blob_client.StageBlock(data_block_id, memory_stream); }, max_unexpected_write_error_retries, data_size);
};

View File

@ -75,6 +75,7 @@ struct RelativePathWithMetadata
virtual std::string getPath() const { return relative_path; }
virtual bool isArchive() const { return false; }
virtual std::string getPathToArchive() const { throw Exception(ErrorCodes::LOGICAL_ERROR, "Not an archive"); }
virtual size_t fileSizeInArchive() const { throw Exception(ErrorCodes::LOGICAL_ERROR, "Not an archive"); }
};
struct ObjectKeyWithMetadata

View File

@ -11,20 +11,6 @@ namespace
/* Bit layouts of UUIDv7
without counter:
0 1 2 3
0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
| unix_ts_ms |
| unix_ts_ms | ver | rand_a |
|var| rand_b |
| rand_b |
with counter:
0 1 2 3
0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1

View File

@ -16,10 +16,12 @@
namespace ProfileEvents
{
extern const Event AzureCopyObject;
extern const Event AzureUploadPart;
extern const Event AzureStageBlock;
extern const Event AzureCommitBlockList;
extern const Event DiskAzureCopyObject;
extern const Event DiskAzureUploadPart;
extern const Event DiskAzureStageBlock;
extern const Event DiskAzureCommitBlockList;
}
@ -156,6 +158,10 @@ namespace
void completeMultipartUpload()
{
auto block_blob_client = client->GetBlockBlobClient(dest_blob);
ProfileEvents::increment(ProfileEvents::AzureCommitBlockList);
if (client->GetClickhouseOptions().IsClientForDisk)
ProfileEvents::increment(ProfileEvents::DiskAzureCommitBlockList);
block_blob_client.CommitBlockList(block_ids);
}
@ -259,9 +265,9 @@ namespace
void processUploadPartRequest(UploadPartTask & task)
{
ProfileEvents::increment(ProfileEvents::AzureUploadPart);
ProfileEvents::increment(ProfileEvents::AzureStageBlock);
if (client->GetClickhouseOptions().IsClientForDisk)
ProfileEvents::increment(ProfileEvents::DiskAzureUploadPart);
ProfileEvents::increment(ProfileEvents::DiskAzureStageBlock);
auto block_blob_client = client->GetBlockBlobClient(dest_blob);
auto read_buffer = std::make_unique<LimitSeekableReadBuffer>(create_read_buffer(), task.part_offset, task.part_size);
@ -333,7 +339,6 @@ void copyAzureBlobStorageFile(
const ReadSettings & read_settings,
ThreadPoolCallbackRunnerUnsafe<void> schedule)
{
if (settings->use_native_copy)
{
LOG_TRACE(getLogger("copyAzureBlobStorageFile"), "Copying Blob: {} from Container: {} using native copy", src_container_for_logging, src_blob);

View File

@ -1328,7 +1328,8 @@ BlockIO InterpreterCreateQuery::createTable(ASTCreateQuery & create)
if (need_add_to_database)
database = DatabaseCatalog::instance().tryGetDatabase(database_name);
if (database && database->getEngineName() == "Replicated" && create.select)
bool allow_heavy_create = getContext()->getSettingsRef().database_replicated_allow_heavy_create;
if (!allow_heavy_create && database && database->getEngineName() == "Replicated" && (create.select || create.is_populate))
{
bool is_storage_replicated = false;
if (create.storage && create.storage->engine)
@ -1338,11 +1339,12 @@ BlockIO InterpreterCreateQuery::createTable(ASTCreateQuery & create)
is_storage_replicated = true;
}
const bool allow_create_select_for_replicated = create.isView() || create.is_create_empty || !is_storage_replicated;
const bool allow_create_select_for_replicated = (create.isView() && !create.is_populate) || create.is_create_empty || !is_storage_replicated;
if (!allow_create_select_for_replicated)
throw Exception(
ErrorCodes::SUPPORT_IS_DISABLED,
"CREATE AS SELECT is not supported with Replicated databases. Use separate CREATE and INSERT queries");
"CREATE AS SELECT and POPULATE is not supported with Replicated databases. Consider using separate CREATE and INSERT queries. "
"Alternatively, you can enable 'database_replicated_allow_heavy_create' setting to allow this operation, use with caution");
}
if (database && database->shouldReplicateQuery(getContext(), query_ptr))

View File

@ -383,7 +383,7 @@ Pipe ReadFromMergeTree::readFromPoolParallelReplicas(
pool, std::move(algorithm), prewhere_info,
actions_settings, block_size_copy, reader_settings);
auto source = std::make_shared<MergeTreeSource>(std::move(processor));
auto source = std::make_shared<MergeTreeSource>(std::move(processor), data.getLogName());
pipes.emplace_back(std::move(source));
}
@ -482,7 +482,7 @@ Pipe ReadFromMergeTree::readFromPool(
pool, std::move(algorithm), prewhere_info,
actions_settings, block_size_copy, reader_settings);
auto source = std::make_shared<MergeTreeSource>(std::move(processor));
auto source = std::make_shared<MergeTreeSource>(std::move(processor), data.getLogName());
if (i == 0)
source->addTotalRowsApprox(total_rows);
@ -595,7 +595,7 @@ Pipe ReadFromMergeTree::readInOrder(
processor->addPartLevelToChunk(isQueryWithFinal());
auto source = std::make_shared<MergeTreeSource>(std::move(processor));
auto source = std::make_shared<MergeTreeSource>(std::move(processor), data.getLogName());
if (set_total_rows_approx)
source->addTotalRowsApprox(total_rows);

View File

@ -133,9 +133,8 @@ private:
};
#endif
MergeTreeSource::MergeTreeSource(MergeTreeSelectProcessorPtr processor_)
: ISource(processor_->getHeader())
, processor(std::move(processor_))
MergeTreeSource::MergeTreeSource(MergeTreeSelectProcessorPtr processor_, const std::string & log_name_)
: ISource(processor_->getHeader()), processor(std::move(processor_)), log_name(log_name_)
{
#if defined(OS_LINUX)
if (processor->getSettings().use_asynchronous_read_from_pool)
@ -207,7 +206,7 @@ std::optional<Chunk> MergeTreeSource::tryGenerate()
try
{
OpenTelemetry::SpanHolder span{"MergeTreeSource::tryGenerate()"};
OpenTelemetry::SpanHolder span{fmt::format("MergeTreeSource({})::tryGenerate", log_name)};
holder->setResult(processor->read());
}
catch (...)
@ -222,7 +221,7 @@ std::optional<Chunk> MergeTreeSource::tryGenerate()
}
#endif
OpenTelemetry::SpanHolder span{"MergeTreeSource::tryGenerate()"};
OpenTelemetry::SpanHolder span{fmt::format("MergeTreeSource({})::tryGenerate", log_name)};
return processReadResult(processor->read());
}

View File

@ -12,7 +12,7 @@ struct ChunkAndProgress;
class MergeTreeSource final : public ISource
{
public:
explicit MergeTreeSource(MergeTreeSelectProcessorPtr processor_);
explicit MergeTreeSource(MergeTreeSelectProcessorPtr processor_, const std::string & log_name_);
~MergeTreeSource() override;
std::string getName() const override;
@ -30,6 +30,7 @@ protected:
private:
MergeTreeSelectProcessorPtr processor;
const std::string log_name;
#if defined(OS_LINUX)
struct AsyncReadingState;

View File

@ -196,13 +196,12 @@ Chunk StorageObjectStorageSource::generate()
const auto & filename = object_info->getFileName();
chassert(object_info->metadata);
VirtualColumnUtils::addRequestedFileLikeStorageVirtualsToChunk(
chunk, read_from_format_info.requested_virtual_columns,
{
.path = getUniqueStoragePathIdentifier(*configuration, *object_info, false),
.size = object_info->metadata->size_bytes,
.filename = &filename,
.last_modified = object_info->metadata->last_modified
});
chunk,
read_from_format_info.requested_virtual_columns,
{.path = getUniqueStoragePathIdentifier(*configuration, *object_info, false),
.size = object_info->isArchive() ? object_info->fileSizeInArchive() : object_info->metadata->size_bytes,
.filename = &filename,
.last_modified = object_info->metadata->last_modified});
const auto & partition_columns = configuration->getPartitionColumns();
if (!partition_columns.empty() && chunk_size && chunk.hasColumns())
@ -227,7 +226,6 @@ Chunk StorageObjectStorageSource::generate()
chunk.addColumn(std::move(partition_column));
}
}
return chunk;
}
@ -715,10 +713,9 @@ static IArchiveReader::NameFilter createArchivePathFilter(const std::string & ar
StorageObjectStorageSource::ArchiveIterator::ObjectInfoInArchive::ObjectInfoInArchive(
ObjectInfoPtr archive_object_,
const std::string & path_in_archive_,
std::shared_ptr<IArchiveReader> archive_reader_)
: archive_object(archive_object_)
, path_in_archive(path_in_archive_)
, archive_reader(archive_reader_)
std::shared_ptr<IArchiveReader> archive_reader_,
IArchiveReader::FileInfo && file_info_)
: archive_object(archive_object_), path_in_archive(path_in_archive_), archive_reader(archive_reader_), file_info(file_info_)
{
}
@ -757,6 +754,7 @@ StorageObjectStorageSource::ObjectInfoPtr
StorageObjectStorageSource::ArchiveIterator::nextImpl(size_t processor)
{
std::unique_lock lock{next_mutex};
IArchiveReader::FileInfo current_file_info{};
while (true)
{
if (filter)
@ -781,6 +779,8 @@ StorageObjectStorageSource::ArchiveIterator::nextImpl(size_t processor)
path_in_archive = file_enumerator->getFileName();
if (!filter(path_in_archive))
continue;
else
current_file_info = file_enumerator->getFileInfo();
}
else
{
@ -794,15 +794,19 @@ StorageObjectStorageSource::ArchiveIterator::nextImpl(size_t processor)
archive_reader = createArchiveReader(archive_object);
if (!archive_reader->fileExists(path_in_archive))
continue;
else
current_file_info = archive_reader->getFileInfo(path_in_archive);
}
auto object_in_archive = std::make_shared<ObjectInfoInArchive>(archive_object, path_in_archive, archive_reader);
if (read_keys != nullptr)
read_keys->push_back(object_in_archive);
return object_in_archive;
break;
}
auto object_in_archive
= std::make_shared<ObjectInfoInArchive>(archive_object, path_in_archive, archive_reader, std::move(current_file_info));
if (read_keys != nullptr)
read_keys->push_back(object_in_archive);
return object_in_archive;
}
size_t StorageObjectStorageSource::ArchiveIterator::estimatedKeysCount()

View File

@ -260,7 +260,8 @@ public:
ObjectInfoInArchive(
ObjectInfoPtr archive_object_,
const std::string & path_in_archive_,
std::shared_ptr<IArchiveReader> archive_reader_);
std::shared_ptr<IArchiveReader> archive_reader_,
IArchiveReader::FileInfo && file_info_);
std::string getFileName() const override
{
@ -279,9 +280,12 @@ public:
bool isArchive() const override { return true; }
size_t fileSizeInArchive() const override { return file_info.uncompressed_size; }
const ObjectInfoPtr archive_object;
const std::string path_in_archive;
const std::shared_ptr<IArchiveReader> archive_reader;
const IArchiveReader::FileInfo file_info;
};
private:

View File

@ -395,11 +395,14 @@ void registerStorageJoin(StorageFactory & factory)
else if (kind_str == "full")
{
if (strictness == JoinStrictness::Any)
strictness = JoinStrictness::RightAny;
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "ANY FULL JOINs are not implemented");
kind = JoinKind::Full;
}
}
if ((strictness == JoinStrictness::Semi || strictness == JoinStrictness::Anti) && (kind != JoinKind::Left && kind != JoinKind::Right))
throw Exception(ErrorCodes::BAD_ARGUMENTS, " SEMI|ANTI JOIN should be LEFT or RIGHT");
if (kind == JoinKind::Comma)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Second parameter of storage Join must be LEFT or INNER or RIGHT or FULL (without quotes).");

View File

@ -40,6 +40,16 @@ create_table_sql_template = """
PRIMARY KEY (`id`)) ENGINE=InnoDB;
"""
create_table_sql_nullable_template = """
CREATE TABLE `clickhouse`.`{}` (
`id` integer not null,
`col1` integer,
`col2` decimal(15,10),
`col3` varchar(32),
`col4` datetime
)
"""
def skip_test_msan(instance):
if instance.is_built_with_memory_sanitizer():
@ -77,6 +87,11 @@ def create_mysql_db(conn, name):
cursor.execute("CREATE DATABASE {} DEFAULT CHARACTER SET 'utf8'".format(name))
def create_mysql_nullable_table(conn, table_name):
with conn.cursor() as cursor:
cursor.execute(create_table_sql_nullable_template.format(table_name))
def create_mysql_table(conn, table_name):
with conn.cursor() as cursor:
cursor.execute(create_table_sql_template.format(table_name))
@ -192,6 +207,46 @@ def started_cluster():
cluster.shutdown()
def test_mysql_odbc_select_nullable(started_cluster):
skip_test_msan(node1)
mysql_setup = node1.odbc_drivers["MySQL"]
table_name = "test_insert_nullable_select"
conn = get_mysql_conn()
create_mysql_nullable_table(conn, table_name)
with conn.cursor() as cursor:
cursor.execute(
"INSERT INTO clickhouse.{} VALUES(1, 1, 1.23456, 'data1', '2010-01-01 00:00:00');".format(
table_name
)
)
cursor.execute(
"INSERT INTO clickhouse.{} VALUES(2, NULL, NULL, NULL, NULL);".format(
table_name
)
)
conn.commit()
node1.query(
"""
CREATE TABLE {}(id UInt32, col1 Nullable(UInt32), col2 Nullable(Decimal(15, 10)), col3 Nullable(String), col4 Nullable(DateTime)) ENGINE = ODBC('DSN={}', 'clickhouse', '{}');
""".format(
table_name, mysql_setup["DSN"], table_name
)
)
assert (
node1.query(
"SELECT id, col1, col2, col3, col4 from {} order by id asc".format(
table_name
)
)
== "1\t1\t1.23456\tdata1\t2010-01-01 00:00:00\n2\t\\N\t\\N\t\\N\t\\N\n"
)
drop_mysql_table(conn, table_name)
conn.close()
def test_mysql_simple_select_works(started_cluster):
skip_test_msan(node1)

View File

@ -9,8 +9,17 @@ CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
. "$CURDIR"/../shell_config.sh
${CLICKHOUSE_CLIENT} --query "CREATE DATABASE ${CLICKHOUSE_DATABASE}_db engine = Replicated('/clickhouse/databases/${CLICKHOUSE_TEST_ZOOKEEPER_PREFIX}/${CLICKHOUSE_DATABASE}_db', '{shard}', '{replica}')"
# Non-replicated engines are allowed
${CLICKHOUSE_CLIENT} --distributed_ddl_output_mode=none --query "CREATE TABLE ${CLICKHOUSE_DATABASE}_db.test (id UInt64) ENGINE = MergeTree() ORDER BY id AS SELECT 1"
${CLICKHOUSE_CLIENT} --distributed_ddl_output_mode=none --query "CREATE MATERIALIZED VIEW ${CLICKHOUSE_DATABASE}_db.test_mv (id UInt64) ENGINE = MergeTree() ORDER BY id POPULATE AS SELECT 1"
# Replicated storafes are forbidden
${CLICKHOUSE_CLIENT} --query "CREATE TABLE ${CLICKHOUSE_DATABASE}_db.test2 (id UInt64) ENGINE = ReplicatedMergeTree('/clickhouse/$CLICKHOUSE_TEST_ZOOKEEPER_PREFIX/test2', '1') ORDER BY id AS SELECT 1" |& grep -cm1 "SUPPORT_IS_DISABLED"
${CLICKHOUSE_CLIENT} --distributed_ddl_output_mode=none --query "CREATE TABLE ${CLICKHOUSE_DATABASE}_db.test2 (id UInt64) ENGINE = ReplicatedMergeTree ORDER BY id AS SELECT 1" |& grep -cm1 "SUPPORT_IS_DISABLED"
${CLICKHOUSE_CLIENT} --distributed_ddl_output_mode=none --query "CREATE MATERIALIZED VIEW ${CLICKHOUSE_DATABASE}_db.test_mv2 (id UInt64) ENGINE = ReplicatedMergeTree ORDER BY id POPULATE AS SELECT 1" |& grep -cm1 "SUPPORT_IS_DISABLED"
# But it is allowed with the special setting
${CLICKHOUSE_CLIENT} --distributed_ddl_output_mode=none --query "CREATE TABLE ${CLICKHOUSE_DATABASE}_db.test2 (id UInt64) ENGINE = ReplicatedMergeTree ORDER BY id AS SELECT 1" --database_replicated_allow_heavy_create=1
${CLICKHOUSE_CLIENT} --distributed_ddl_output_mode=none --query "CREATE MATERIALIZED VIEW ${CLICKHOUSE_DATABASE}_db.test_mv2 (id UInt64) ENGINE = ReplicatedMergeTree ORDER BY id POPULATE AS SELECT 1" --database_replicated_allow_heavy_create=1
${CLICKHOUSE_CLIENT} --query "DROP DATABASE ${CLICKHOUSE_DATABASE}_db"

View File

@ -1,52 +1,52 @@
1 Str1 example1.csv test/03036_archive1.zip::example1.csv
2 Str2 example1.csv test/03036_archive1.zip::example1.csv
3 Str3 example2.csv test/03036_archive2.zip::example2.csv
4 Str4 example2.csv test/03036_archive2.zip::example2.csv
5 Str5 example3.csv test/03036_archive2.zip::example3.csv
6 Str6 example3.csv test/03036_archive2.zip::example3.csv
3 Str3 example2.csv test/03036_archive1.zip::example2.csv
3 Str3 example2.csv test/03036_archive2.zip::example2.csv
4 Str4 example2.csv test/03036_archive1.zip::example2.csv
4 Str4 example2.csv test/03036_archive2.zip::example2.csv
1 Str1 example1.csv test/03036_archive1.zip::example1.csv
2 Str2 example1.csv test/03036_archive1.zip::example1.csv
3 Str3 example2.csv test/03036_archive1.zip::example2.csv
3 Str3 example2.csv test/03036_archive2.zip::example2.csv
4 Str4 example2.csv test/03036_archive1.zip::example2.csv
4 Str4 example2.csv test/03036_archive2.zip::example2.csv
5 Str5 example3.csv test/03036_archive2.zip::example3.csv
6 Str6 example3.csv test/03036_archive2.zip::example3.csv
1 Str1 example1.csv test/03036_archive1.tar::example1.csv
2 Str2 example1.csv test/03036_archive1.tar::example1.csv
7 Str7 example4.csv test/03036_archive1.tar::example4.csv
7 Str7 example4.csv test/03036_archive2.tar::example4.csv
8 Str8 example4.csv test/03036_archive1.tar::example4.csv
8 Str8 example4.csv test/03036_archive2.tar::example4.csv
5 Str5 example3.csv test/03036_archive2.tar::example3.csv
6 Str6 example3.csv test/03036_archive2.tar::example3.csv
7 Str7 example4.csv test/03036_archive2.tar::example4.csv
8 Str8 example4.csv test/03036_archive2.tar::example4.csv
9 Str9 example5.csv test/03036_archive2.tar::example5.csv
10 Str10 example5.csv test/03036_archive2.tar::example5.csv
3 Str3 example2.csv test/03036_archive3.tar.gz::example2.csv
4 Str4 example2.csv test/03036_archive3.tar.gz::example2.csv
11 Str11 example6.csv test/03036_archive3.tar.gz::example6.csv
12 Str12 example6.csv test/03036_archive3.tar.gz::example6.csv
3 Str3 example2.csv test/03036_archive3.tar.gz::example2.csv
4 Str4 example2.csv test/03036_archive3.tar.gz::example2.csv
5 Str5 example3.csv test/03036_archive2.tar::example3.csv
6 Str6 example3.csv test/03036_archive2.tar::example3.csv
3 Str3 example2.csv test/03036_archive2.zip::example2.csv
4 Str4 example2.csv test/03036_archive2.zip::example2.csv
5 Str5 example3.csv test/03036_archive2.tar::example3.csv
6 Str6 example3.csv test/03036_archive2.tar::example3.csv
7 Str7 example4.csv test/03036_archive2.tar::example4.csv
8 Str8 example4.csv test/03036_archive2.tar::example4.csv
9 Str9 example5.csv test/03036_archive2.tar::example5.csv
10 Str10 example5.csv test/03036_archive2.tar::example5.csv
3 Str3 example2.csv test/03036_archive3.tar.gz::example2.csv
4 Str4 example2.csv test/03036_archive3.tar.gz::example2.csv
5 Str5 example3.csv test/03036_archive2.tar::example3.csv
6 Str6 example3.csv test/03036_archive2.tar::example3.csv
13 Str13 example7.csv test/03036_compressed_file_archive.zip::example7.csv
14 Str14 example7.csv test/03036_compressed_file_archive.zip::example7.csv
1 Str1 25 example1.csv test/03036_archive1.zip::example1.csv
2 Str2 25 example1.csv test/03036_archive1.zip::example1.csv
3 Str3 25 example2.csv test/03036_archive2.zip::example2.csv
4 Str4 25 example2.csv test/03036_archive2.zip::example2.csv
5 Str5 25 example3.csv test/03036_archive2.zip::example3.csv
6 Str6 25 example3.csv test/03036_archive2.zip::example3.csv
3 Str3 25 example2.csv test/03036_archive1.zip::example2.csv
3 Str3 25 example2.csv test/03036_archive2.zip::example2.csv
4 Str4 25 example2.csv test/03036_archive1.zip::example2.csv
4 Str4 25 example2.csv test/03036_archive2.zip::example2.csv
1 Str1 25 example1.csv test/03036_archive1.zip::example1.csv
2 Str2 25 example1.csv test/03036_archive1.zip::example1.csv
3 Str3 25 example2.csv test/03036_archive1.zip::example2.csv
3 Str3 25 example2.csv test/03036_archive2.zip::example2.csv
4 Str4 25 example2.csv test/03036_archive1.zip::example2.csv
4 Str4 25 example2.csv test/03036_archive2.zip::example2.csv
5 Str5 25 example3.csv test/03036_archive2.zip::example3.csv
6 Str6 25 example3.csv test/03036_archive2.zip::example3.csv
1 Str1 25 example1.csv test/03036_archive1.tar::example1.csv
2 Str2 25 example1.csv test/03036_archive1.tar::example1.csv
7 Str7 25 example4.csv test/03036_archive1.tar::example4.csv
7 Str7 25 example4.csv test/03036_archive2.tar::example4.csv
8 Str8 25 example4.csv test/03036_archive1.tar::example4.csv
8 Str8 25 example4.csv test/03036_archive2.tar::example4.csv
5 Str5 25 example3.csv test/03036_archive2.tar::example3.csv
6 Str6 25 example3.csv test/03036_archive2.tar::example3.csv
7 Str7 25 example4.csv test/03036_archive2.tar::example4.csv
8 Str8 25 example4.csv test/03036_archive2.tar::example4.csv
9 Str9 27 example5.csv test/03036_archive2.tar::example5.csv
10 Str10 27 example5.csv test/03036_archive2.tar::example5.csv
3 Str3 25 example2.csv test/03036_archive3.tar.gz::example2.csv
4 Str4 25 example2.csv test/03036_archive3.tar.gz::example2.csv
11 Str11 29 example6.csv test/03036_archive3.tar.gz::example6.csv
12 Str12 29 example6.csv test/03036_archive3.tar.gz::example6.csv
3 Str3 25 example2.csv test/03036_archive3.tar.gz::example2.csv
4 Str4 25 example2.csv test/03036_archive3.tar.gz::example2.csv
5 Str5 25 example3.csv test/03036_archive2.tar::example3.csv
6 Str6 25 example3.csv test/03036_archive2.tar::example3.csv
3 Str3 25 example2.csv test/03036_archive2.zip::example2.csv
4 Str4 25 example2.csv test/03036_archive2.zip::example2.csv
5 Str5 25 example3.csv test/03036_archive2.tar::example3.csv
6 Str6 25 example3.csv test/03036_archive2.tar::example3.csv
7 Str7 25 example4.csv test/03036_archive2.tar::example4.csv
8 Str8 25 example4.csv test/03036_archive2.tar::example4.csv
9 Str9 27 example5.csv test/03036_archive2.tar::example5.csv
10 Str10 27 example5.csv test/03036_archive2.tar::example5.csv
3 Str3 25 example2.csv test/03036_archive3.tar.gz::example2.csv
4 Str4 25 example2.csv test/03036_archive3.tar.gz::example2.csv
5 Str5 25 example3.csv test/03036_archive2.tar::example3.csv
6 Str6 25 example3.csv test/03036_archive2.tar::example3.csv
13 Str13 57 example7.csv test/03036_compressed_file_archive.zip::example7.csv
14 Str14 57 example7.csv test/03036_compressed_file_archive.zip::example7.csv

View File

@ -1,22 +1,22 @@
-- Tags: no-fasttest
-- Tag no-fasttest: Depends on AWS
SELECT id, data, _file, _path FROM s3(s3_conn, filename='03036_archive1.zip :: example1.csv') ORDER BY (id, _file, _path);
SELECT id, data, _file, _path FROM s3(s3_conn, filename='03036_archive2.zip :: example*.csv') ORDER BY (id, _file, _path);
SELECT id, data, _file, _path FROM s3(s3_conn, filename='03036_archive*.zip :: example2.csv') ORDER BY (id, _file, _path);
SELECT id, data, _file, _path FROM s3(s3_conn, filename='03036_archive*.zip :: example*') ORDER BY (id, _file, _path);
SELECT id, data, _file, _path FROM s3(s3_conn, filename='03036_archive1.tar :: example1.csv') ORDER BY (id, _file, _path);
SELECT id, data, _file, _path FROM s3(s3_conn, filename='03036_archive*.tar :: example4.csv') ORDER BY (id, _file, _path);
SELECT id, data, _file, _path FROM s3(s3_conn, filename='03036_archive2.tar :: example*.csv') ORDER BY (id, _file, _path);
SELECT id, data, _file, _path FROM s3(s3_conn, filename='03036_archive*.tar.gz :: example*.csv') ORDER BY (id, _file, _path);
SELECT id, data, _file, _path FROM s3(s3_conn, filename='03036_archive*.tar* :: example{2..3}.csv') ORDER BY (id, _file, _path);
select id, data, _file, _path from s3(s3_conn, filename='03036_archive2.zip :: nonexistent.csv'); -- { serverError CANNOT_EXTRACT_TABLE_STRUCTURE }
select id, data, _file, _path from s3(s3_conn, filename='03036_archive2.zip :: nonexistent{2..3}.csv'); -- { serverError CANNOT_EXTRACT_TABLE_STRUCTURE }
SELECT id, data, _size, _file, _path FROM s3(s3_conn, filename='03036_archive1.zip :: example1.csv') ORDER BY (id, _file, _path);
SELECT id, data, _size, _file, _path FROM s3(s3_conn, filename='03036_archive2.zip :: example*.csv') ORDER BY (id, _file, _path);
SELECT id, data, _size, _file, _path FROM s3(s3_conn, filename='03036_archive*.zip :: example2.csv') ORDER BY (id, _file, _path);
SELECT id, data, _size, _file, _path FROM s3(s3_conn, filename='03036_archive*.zip :: example*') ORDER BY (id, _file, _path);
SELECT id, data, _size, _file, _path FROM s3(s3_conn, filename='03036_archive1.tar :: example1.csv') ORDER BY (id, _file, _path);
SELECT id, data, _size, _file, _path FROM s3(s3_conn, filename='03036_archive*.tar :: example4.csv') ORDER BY (id, _file, _path);
SELECT id, data, _size, _file, _path FROM s3(s3_conn, filename='03036_archive2.tar :: example*.csv') ORDER BY (id, _file, _path);
SELECT id, data, _size, _file, _path FROM s3(s3_conn, filename='03036_archive*.tar.gz :: example*.csv') ORDER BY (id, _file, _path);
SELECT id, data, _size, _file, _path FROM s3(s3_conn, filename='03036_archive*.tar* :: example{2..3}.csv') ORDER BY (id, _file, _path);
select id, data, _size, _file, _path from s3(s3_conn, filename='03036_archive2.zip :: nonexistent.csv'); -- { serverError CANNOT_EXTRACT_TABLE_STRUCTURE }
select id, data, _size, _file, _path from s3(s3_conn, filename='03036_archive2.zip :: nonexistent{2..3}.csv'); -- { serverError CANNOT_EXTRACT_TABLE_STRUCTURE }
CREATE TABLE table_zip22 Engine S3(s3_conn, filename='03036_archive2.zip :: example2.csv');
select id, data, _file, _path from table_zip22 ORDER BY (id, _file, _path);
select id, data, _size, _file, _path from table_zip22 ORDER BY (id, _file, _path);
CREATE table table_tar2star Engine S3(s3_conn, filename='03036_archive2.tar :: example*.csv');
SELECT id, data, _file, _path FROM table_tar2star ORDER BY (id, _file, _path);
SELECT id, data, _size, _file, _path FROM table_tar2star ORDER BY (id, _file, _path);
CREATE table table_tarstarglobs Engine S3(s3_conn, filename='03036_archive*.tar* :: example{2..3}.csv');
SELECT id, data, _file, _path FROM table_tarstarglobs ORDER BY (id, _file, _path);
SELECT id, data, _size, _file, _path FROM table_tarstarglobs ORDER BY (id, _file, _path);
CREATE table table_noexist Engine s3(s3_conn, filename='03036_archive2.zip :: nonexistent.csv'); -- { serverError UNKNOWN_STORAGE }
SELECT id, data, _file, _path FROM s3(s3_conn, filename='03036_compressed_file_archive.zip :: example7.csv', format='CSV', structure='auto', compression_method='gz') ORDER BY (id, _file, _path)
SELECT id, data, _size, _file, _path FROM s3(s3_conn, filename='03036_compressed_file_archive.zip :: example7.csv', format='CSV', structure='auto', compression_method='gz') ORDER BY (id, _file, _path)

View File

@ -0,0 +1,42 @@
DROP TABLE IF EXISTS t1;
CREATE TABLE t1
(
a Int64,
b Int64
) Engine = Join(SEMI, ALL, a); -- { serverError BAD_ARGUMENTS }
CREATE TABLE t1
(
a Int64,
b Int64
) Engine = Join(SEMI, INNER, a); -- { serverError BAD_ARGUMENTS }
CREATE TABLE t1
(
a Int64,
b Int64
) Engine = Join(SEMI, FULL, a); -- { serverError BAD_ARGUMENTS }
CREATE TABLE t1
(
a Int64,
b Int64
) Engine = Join(ANTI, ALL, a); -- { serverError BAD_ARGUMENTS }
CREATE TABLE t1
(
a Int64,
b Int64
) Engine = Join(ANTI, INNER, a); -- { serverError BAD_ARGUMENTS }
CREATE TABLE t1
(
a Int64,
b Int64
) Engine = Join(ANTI, FULL, a); -- { serverError BAD_ARGUMENTS }
CREATE TABLE t1
(
a Int64,
b Int64
) Engine = Join(ANY, FULL, a); -- { serverError NOT_IMPLEMENTED }