Merge branch 'master' into fix-hdfs-virtual-columns

This commit is contained in:
Kseniia Sumarokova 2021-04-20 22:01:32 +03:00 committed by GitHub
commit f9291d7624
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
57 changed files with 727 additions and 158 deletions

View File

@ -159,9 +159,9 @@ public:
*/
Pool(const std::string & db_,
const std::string & server_,
const std::string & user_ = "",
const std::string & password_ = "",
unsigned port_ = 0,
const std::string & user_,
const std::string & password_,
unsigned port_,
const std::string & socket_ = "",
unsigned connect_timeout_ = MYSQLXX_DEFAULT_TIMEOUT,
unsigned rw_timeout_ = MYSQLXX_DEFAULT_RW_TIMEOUT,

View File

@ -17,6 +17,9 @@
<!-- One NUMA node w/o hyperthreading -->
<max_threads>12</max_threads>
<!-- mmap shows some improvements in perf tests -->
<min_bytes_to_use_mmap_io>64Mi</min_bytes_to_use_mmap_io>
</default>
</profiles>
<users>

View File

@ -104,6 +104,12 @@ clickhouse-client -q "system flush logs" ||:
pigz < /var/log/clickhouse-server/clickhouse-server.log > /test_output/clickhouse-server.log.gz &
clickhouse-client -q "select * from system.query_log format TSVWithNamesAndTypes" | pigz > /test_output/query-log.tsv.gz &
clickhouse-client -q "select * from system.query_thread_log format TSVWithNamesAndTypes" | pigz > /test_output/query-thread-log.tsv.gz &
clickhouse-client --allow_introspection_functions=1 -q "
WITH
arrayMap(x -> concat(demangle(addressToSymbol(x)), ':', addressToLine(x)), trace) AS trace_array,
arrayStringConcat(trace_array, '\n') AS trace_string
SELECT * EXCEPT(trace), trace_string FROM system.trace_log FORMAT TSVWithNamesAndTypes
" | pigz > /test_output/trace-log.tsv.gz &
wait ||:
mv /var/log/clickhouse-server/stderr.log /test_output/ ||:

View File

@ -136,6 +136,7 @@ pigz < /var/log/clickhouse-server/clickhouse-server.log > /test_output/clickhous
tar -chf /test_output/coordination.tar /var/lib/clickhouse/coordination ||:
mv /var/log/clickhouse-server/stderr.log /test_output/
tar -chf /test_output/query_log_dump.tar /var/lib/clickhouse/data/system/query_log ||:
tar -chf /test_output/trace_log_dump.tar /var/lib/clickhouse/data/system/trace_log ||:
# Write check result into check_status.tsv
clickhouse-local --structure "test String, res String" -q "SELECT 'failure', test FROM table WHERE res != 'OK' order by (lower(test) like '%hung%') LIMIT 1" < /test_output/test_results.tsv > /test_output/check_status.tsv

View File

@ -6,7 +6,7 @@ toc_priority: 207
Computes an approximate [quantile](https://en.wikipedia.org/wiki/Quantile) of a numeric data sequence using the [t-digest](https://github.com/tdunning/t-digest/blob/master/docs/t-digest-paper/histo.pdf) algorithm.
The maximum error is 1%. Memory consumption is `log(n)`, where `n` is a number of values. The result depends on the order of running the query, and is nondeterministic.
Memory consumption is `log(n)`, where `n` is a number of values. The result depends on the order of running the query, and is nondeterministic.
The performance of the function is lower than performance of [quantile](../../../sql-reference/aggregate-functions/reference/quantile.md#quantile) or [quantileTiming](../../../sql-reference/aggregate-functions/reference/quantiletiming.md#quantiletiming). In terms of the ratio of State size to precision, this function is much better than `quantile`.

View File

@ -181,7 +181,7 @@ std::vector<std::pair<String, uint16_t>> parseRemoteDescriptionForExternalDataba
size_t colon = address.find(':');
if (colon == String::npos)
{
LOG_WARNING(&Poco::Logger::get("ParseRemoteDescription"), "Port is not found for host: {}. Using default port {}", default_port);
LOG_WARNING(&Poco::Logger::get("ParseRemoteDescription"), "Port is not found for host: {}. Using default port {}", address, default_port);
result.emplace_back(std::make_pair(address, default_port));
}
else

View File

@ -70,6 +70,7 @@ class IColumn;
M(UInt64, connections_with_failover_max_tries, DBMS_CONNECTION_POOL_WITH_FAILOVER_DEFAULT_MAX_TRIES, "The maximum number of attempts to connect to replicas.", 0) \
M(UInt64, s3_min_upload_part_size, 512*1024*1024, "The minimum size of part to upload during multipart upload to S3.", 0) \
M(UInt64, s3_max_single_part_upload_size, 64*1024*1024, "The maximum size of object to upload using singlepart upload to S3.", 0) \
M(UInt64, s3_max_single_read_retries, 4, "The maximum number of retries during single S3 read.", 0) \
M(UInt64, s3_max_redirects, 10, "Max number of S3 redirects hops allowed.", 0) \
M(UInt64, s3_max_connections, 1024, "The maximum number of connections per server.", 0) \
M(Bool, extremes, false, "Calculate minimums and maximums of the result columns. They can be output in JSON-formats.", IMPORTANT) \
@ -142,7 +143,7 @@ class IColumn;
M(UInt64, optimize_min_equality_disjunction_chain_length, 3, "The minimum length of the expression `expr = x1 OR ... expr = xN` for optimization ", 0) \
\
M(UInt64, min_bytes_to_use_direct_io, 0, "The minimum number of bytes for reading the data with O_DIRECT option during SELECT queries execution. 0 - disabled.", 0) \
M(UInt64, min_bytes_to_use_mmap_io, (64 * 1024 * 1024), "The minimum number of bytes for reading the data with mmap option during SELECT queries execution. 0 - disabled.", 0) \
M(UInt64, min_bytes_to_use_mmap_io, 0, "The minimum number of bytes for reading the data with mmap option during SELECT queries execution. 0 - disabled.", 0) \
M(Bool, checksum_on_read, true, "Validate checksums on reading. It is enabled by default and should be always enabled in production. Please do not expect any benefits in disabling this setting. It may only be used for experiments and benchmarks. The setting only applicable for tables of MergeTree family. Checksums are always validated for other table engines and when receiving data over network.", 0) \
\
M(Bool, force_index_by_date, 0, "Throw an exception if there is a partition key in a table, and it is not used.", 0) \

View File

@ -365,8 +365,8 @@ void DatabaseAtomic::assertDetachedTableNotInUse(const UUID & uuid)
/// 4. INSERT INTO table ...; (both Storage instances writes data without any synchronization)
/// To avoid it, we remember UUIDs of detached tables and does not allow ATTACH table with such UUID until detached instance still in use.
if (detached_tables.count(uuid))
throw Exception("Cannot attach table with UUID " + toString(uuid) +
", because it was detached but still used by some query. Retry later.", ErrorCodes::TABLE_ALREADY_EXISTS);
throw Exception(ErrorCodes::TABLE_ALREADY_EXISTS, "Cannot attach table with UUID {}, "
"because it was detached but still used by some query. Retry later.", toString(uuid));
}
void DatabaseAtomic::setDetachedTableNotInUseForce(const UUID & uuid)
@ -573,12 +573,6 @@ void DatabaseAtomic::renameDictionaryInMemoryUnlocked(const StorageID & old_name
}
void DatabaseAtomic::waitDetachedTableNotInUse(const UUID & uuid)
{
{
std::lock_guard lock{mutex};
if (detached_tables.count(uuid) == 0)
return;
}
/// Table is in use while its shared_ptr counter is greater than 1.
/// We cannot trigger condvar on shared_ptr destruction, so it's busy wait.
while (true)
@ -594,5 +588,13 @@ void DatabaseAtomic::waitDetachedTableNotInUse(const UUID & uuid)
}
}
void DatabaseAtomic::checkDetachedTableNotInUse(const UUID & uuid)
{
DetachedTables not_in_use;
std::lock_guard lock{mutex};
not_in_use = cleanupDetachedTables();
assertDetachedTableNotInUse(uuid);
}
}

View File

@ -58,6 +58,7 @@ public:
void tryRemoveSymlink(const String & table_name);
void waitDetachedTableNotInUse(const UUID & uuid) override;
void checkDetachedTableNotInUse(const UUID & uuid) override;
void setDetachedTableNotInUseForce(const UUID & uuid);
protected:

View File

@ -158,7 +158,7 @@ DatabasePtr DatabaseFactory::getImpl(const ASTCreateQuery & create, const String
const auto & [remote_host_name, remote_port] = parseAddress(host_port, 3306);
MySQLClient client(remote_host_name, remote_port, mysql_user_name, mysql_user_password);
auto mysql_pool = mysqlxx::Pool(mysql_database_name, remote_host_name, mysql_user_name, mysql_user_password);
auto mysql_pool = mysqlxx::Pool(mysql_database_name, remote_host_name, mysql_user_name, mysql_user_password, remote_port);
auto materialize_mode_settings = std::make_unique<MaterializeMySQLSettings>();

View File

@ -169,11 +169,22 @@ void DatabaseWithDictionaries::createDictionary(ContextPtr local_context, const
}
bool succeeded = false;
bool uuid_locked = false;
SCOPE_EXIT({
if (!succeeded)
{
if (uuid_locked)
DatabaseCatalog::instance().removeUUIDMappingFinally(dict_id.uuid);
Poco::File(dictionary_metadata_tmp_path).remove();
}
});
if (dict_id.uuid != UUIDHelpers::Nil)
{
DatabaseCatalog::instance().addUUIDMapping(dict_id.uuid);
uuid_locked = true;
}
/// Add a temporary repository containing the dictionary.
/// We need this temp repository to try loading the dictionary before actually attaching it to the database.
auto temp_repository = external_loader.addConfigRepository(std::make_unique<ExternalLoaderTempConfigRepository>(

View File

@ -345,7 +345,8 @@ public:
virtual void assertCanBeDetached(bool /*cleanup*/) {}
virtual void waitDetachedTableNotInUse(const UUID & /*uuid*/) { assert(false); }
virtual void waitDetachedTableNotInUse(const UUID & /*uuid*/) { }
virtual void checkDetachedTableNotInUse(const UUID & /*uuid*/) { }
/// Ask all tables to complete the background threads they are using and delete all table objects.
virtual void shutdown() = 0;

View File

@ -72,11 +72,15 @@ std::map<String, NamesAndTypesList> fetchTablesColumnsList(
" IS_NULLABLE = 'YES' AS is_nullable,"
" COLUMN_TYPE LIKE '%unsigned' AS is_unsigned,"
" CHARACTER_MAXIMUM_LENGTH AS length,"
" NUMERIC_PRECISION as '',"
" NUMERIC_PRECISION as numeric_precision,"
" IF(ISNULL(NUMERIC_SCALE), DATETIME_PRECISION, NUMERIC_SCALE) AS scale" // we know DATETIME_PRECISION as a scale in CH
" FROM INFORMATION_SCHEMA.COLUMNS"
" WHERE TABLE_SCHEMA = " << quote << database_name
<< " AND TABLE_NAME IN " << toQueryStringWithQuote(tables_name) << " ORDER BY ORDINAL_POSITION";
" WHERE ";
if (!database_name.empty())
query << " TABLE_SCHEMA = " << quote << database_name << " AND ";
query << " TABLE_NAME IN " << toQueryStringWithQuote(tables_name) << " ORDER BY ORDINAL_POSITION";
StreamSettings mysql_input_stream_settings(settings);
MySQLBlockInputStream result(pool.get(), query.str(), tables_columns_sample_block, mysql_input_stream_settings);

View File

@ -250,8 +250,12 @@ class ReadIndirectBufferFromS3 final : public ReadBufferFromFileBase
{
public:
ReadIndirectBufferFromS3(
std::shared_ptr<Aws::S3::S3Client> client_ptr_, const String & bucket_, DiskS3::Metadata metadata_, size_t buf_size_)
: client_ptr(std::move(client_ptr_)), bucket(bucket_), metadata(std::move(metadata_)), buf_size(buf_size_)
std::shared_ptr<Aws::S3::S3Client> client_ptr_, const String & bucket_, DiskS3::Metadata metadata_, UInt64 s3_max_single_read_retries_, size_t buf_size_)
: client_ptr(std::move(client_ptr_))
, bucket(bucket_)
, metadata(std::move(metadata_))
, s3_max_single_read_retries(s3_max_single_read_retries_)
, buf_size(buf_size_)
{
}
@ -307,7 +311,7 @@ private:
const auto & [path, size] = metadata.s3_objects[i];
if (size > offset)
{
auto buf = std::make_unique<ReadBufferFromS3>(client_ptr, bucket, metadata.s3_root_path + path, buf_size);
auto buf = std::make_unique<ReadBufferFromS3>(client_ptr, bucket, metadata.s3_root_path + path, s3_max_single_read_retries, buf_size);
buf->seek(offset, SEEK_SET);
return buf;
}
@ -336,7 +340,7 @@ private:
++current_buf_idx;
const auto & path = metadata.s3_objects[current_buf_idx].first;
current_buf = std::make_unique<ReadBufferFromS3>(client_ptr, bucket, metadata.s3_root_path + path, buf_size);
current_buf = std::make_unique<ReadBufferFromS3>(client_ptr, bucket, metadata.s3_root_path + path, s3_max_single_read_retries, buf_size);
current_buf->next();
working_buffer = current_buf->buffer();
absolute_position += working_buffer.size();
@ -347,6 +351,7 @@ private:
std::shared_ptr<Aws::S3::S3Client> client_ptr;
const String & bucket;
DiskS3::Metadata metadata;
UInt64 s3_max_single_read_retries;
size_t buf_size;
size_t absolute_position = 0;
@ -560,6 +565,7 @@ DiskS3::DiskS3(
String bucket_,
String s3_root_path_,
String metadata_path_,
UInt64 s3_max_single_read_retries_,
size_t min_upload_part_size_,
size_t max_single_part_upload_size_,
size_t min_bytes_for_seek_,
@ -573,6 +579,7 @@ DiskS3::DiskS3(
, bucket(std::move(bucket_))
, s3_root_path(std::move(s3_root_path_))
, metadata_path(std::move(metadata_path_))
, s3_max_single_read_retries(s3_max_single_read_retries_)
, min_upload_part_size(min_upload_part_size_)
, max_single_part_upload_size(max_single_part_upload_size_)
, min_bytes_for_seek(min_bytes_for_seek_)
@ -679,7 +686,7 @@ std::unique_ptr<ReadBufferFromFileBase> DiskS3::readFile(const String & path, si
LOG_DEBUG(log, "Read from file by path: {}. Existing S3 objects: {}",
backQuote(metadata_path + path), metadata.s3_objects.size());
auto reader = std::make_unique<ReadIndirectBufferFromS3>(client, bucket, metadata, buf_size);
auto reader = std::make_unique<ReadIndirectBufferFromS3>(client, bucket, metadata, s3_max_single_read_retries, buf_size);
return std::make_unique<SeekAvoidingReadBuffer>(std::move(reader), min_bytes_for_seek);
}
@ -979,7 +986,7 @@ int DiskS3::readSchemaVersion(const String & source_bucket, const String & sourc
if (!checkObjectExists(source_bucket, source_path + SCHEMA_VERSION_OBJECT))
return version;
ReadBufferFromS3 buffer (client, source_bucket, source_path + SCHEMA_VERSION_OBJECT);
ReadBufferFromS3 buffer(client, source_bucket, source_path + SCHEMA_VERSION_OBJECT, s3_max_single_read_retries);
readIntText(version, buffer);
return version;

View File

@ -41,6 +41,7 @@ public:
String bucket_,
String s3_root_path_,
String metadata_path_,
UInt64 s3_max_single_read_retries_,
size_t min_upload_part_size_,
size_t max_single_part_upload_size_,
size_t min_bytes_for_seek_,
@ -185,6 +186,7 @@ private:
const String bucket;
const String s3_root_path;
String metadata_path;
UInt64 s3_max_single_read_retries;
size_t min_upload_part_size;
size_t max_single_part_upload_size;
size_t min_bytes_for_seek;

View File

@ -161,6 +161,7 @@ void registerDiskS3(DiskFactory & factory)
uri.bucket,
uri.key,
metadata_path,
context->getSettingsRef().s3_max_single_read_retries,
context->getSettingsRef().s3_min_upload_part_size,
context->getSettingsRef().s3_max_single_part_upload_size,
config.getUInt64(config_prefix + ".min_bytes_for_seek", 1024 * 1024),

View File

@ -45,6 +45,7 @@ void registerFunctionsUnixTimestamp64(FunctionFactory & factory);
void registerFunctionBitHammingDistance(FunctionFactory & factory);
void registerFunctionTupleHammingDistance(FunctionFactory & factory);
void registerFunctionsStringHash(FunctionFactory & factory);
void registerFunctionValidateNestedArraySizes(FunctionFactory & factory);
#if !defined(ARCADIA_BUILD)
void registerFunctionBayesAB(FunctionFactory &);
#endif
@ -103,6 +104,7 @@ void registerFunctions()
registerFunctionBitHammingDistance(factory);
registerFunctionTupleHammingDistance(factory);
registerFunctionsStringHash(factory);
registerFunctionValidateNestedArraySizes(factory);
#if !defined(ARCADIA_BUILD)
registerFunctionBayesAB(factory);

View File

@ -0,0 +1,113 @@
#include <DataTypes/DataTypesNumber.h>
#include <Functions/FunctionFactory.h>
#include <Functions/FunctionHelpers.h>
#include <Columns/ColumnsNumber.h>
#include <Columns/ColumnArray.h>
#include <Core/ColumnWithTypeAndName.h>
namespace DB
{
namespace ErrorCodes
{
extern const int ILLEGAL_TYPE_OF_ARGUMENT;
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
extern const int SIZES_OF_ARRAYS_DOESNT_MATCH;
}
/** Function validateNestedArraySizes is used to check the consistency of Nested DataType subcolumns's offsets when Update
* Arguments: num > 2
* The first argument is the condition of WHERE in UPDATE operation, only when this is true, we need to check
* The rest arguments are the subcolumns of Nested DataType.
*/
class FunctionValidateNestedArraySizes : public IFunction
{
public:
static constexpr auto name = "validateNestedArraySizes";
static FunctionPtr create(ContextPtr) { return std::make_shared<FunctionValidateNestedArraySizes>(); }
String getName() const override { return name; }
bool isVariadic() const override { return true; }
size_t getNumberOfArguments() const override { return 0; }
bool useDefaultImplementationForConstants() const override { return true; }
DataTypePtr getReturnTypeImpl(const DataTypes & arguments) const override;
ColumnPtr executeImpl(const ColumnsWithTypeAndName & arguments, const DataTypePtr &, size_t input_rows_count) const override;
};
DataTypePtr FunctionValidateNestedArraySizes::getReturnTypeImpl(const DataTypes & arguments) const
{
size_t num_args = arguments.size();
if (num_args < 3)
throw Exception(
"Function " + getName() + " needs more than two arguments; passed " + toString(arguments.size()) + ".",
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
if (!WhichDataType(arguments[0]).isUInt8())
throw Exception("Illegal type " + arguments[0]->getName() + " of first argument of function " + getName() + " Must be UInt.",
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
for (size_t i = 1; i < num_args; ++i)
if (!WhichDataType(arguments[i]).isArray())
throw Exception(
"Illegal type " + arguments[i]->getName() + " of " + toString(i) + " argument of function " + getName() + " Must be Array.",
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
return std::make_shared<DataTypeUInt8>();
}
ColumnPtr FunctionValidateNestedArraySizes::executeImpl(
const ColumnsWithTypeAndName & arguments, const DataTypePtr & /*result_type*/, size_t input_rows_count) const
{
const ColumnUInt8 * condition_column = typeid_cast<const ColumnUInt8 *>(arguments[0].column.get());
size_t args_num = arguments.size();
for (size_t i = 0; i < input_rows_count; ++i)
{
if (!condition_column->getData()[i])
continue;
/// The condition is true, then check the row in subcolumns in Nested Type has the same array size
size_t first_length = 0;
size_t length = 0;
for (size_t args_idx = 1; args_idx < args_num; ++args_idx)
{
const auto & current_arg = arguments[args_idx];
const ColumnArray * current_column = nullptr;
if (const auto * const_array = checkAndGetColumnConst<ColumnArray>(current_arg.column.get()))
{
current_column = checkAndGetColumn<ColumnArray>(&const_array->getDataColumn());
length = current_column->getOffsets()[0];
}
else
{
current_column = checkAndGetColumn<ColumnArray>(current_arg.column.get());
const auto & offsets = current_column->getOffsets();
length = offsets[i] - offsets[i - 1];
}
if (args_idx == 1)
{
first_length = length;
}
else if (first_length != length)
{
throw Exception(
ErrorCodes::SIZES_OF_ARRAYS_DOESNT_MATCH,
"Elements '{}' and '{}' of Nested data structure (Array columns) "
"have different array sizes ({} and {} respectively) on row {}",
arguments[1].name, arguments[args_idx].name, first_length, length, i);
}
}
}
return ColumnUInt8::create(input_rows_count, 1);
}
void registerFunctionValidateNestedArraySizes(FunctionFactory & factory)
{
factory.registerFunction<FunctionValidateNestedArraySizes>();
}
}

View File

@ -532,6 +532,7 @@ SRCS(
upper.cpp
upperUTF8.cpp
uptime.cpp
validateNestedArraySizes.cpp
version.cpp
visibleWidth.cpp
visitParamExtractBool.cpp

View File

@ -12,10 +12,12 @@
# include <utility>
namespace ProfileEvents
{
extern const Event S3ReadMicroseconds;
extern const Event S3ReadBytes;
extern const Event S3ReadRequestsErrors;
}
namespace DB
@ -29,26 +31,58 @@ namespace ErrorCodes
ReadBufferFromS3::ReadBufferFromS3(
std::shared_ptr<Aws::S3::S3Client> client_ptr_, const String & bucket_, const String & key_, size_t buffer_size_)
: SeekableReadBuffer(nullptr, 0), client_ptr(std::move(client_ptr_)), bucket(bucket_), key(key_), buffer_size(buffer_size_)
std::shared_ptr<Aws::S3::S3Client> client_ptr_, const String & bucket_, const String & key_, UInt64 s3_max_single_read_retries_, size_t buffer_size_)
: SeekableReadBuffer(nullptr, 0)
, client_ptr(std::move(client_ptr_))
, bucket(bucket_)
, key(key_)
, s3_max_single_read_retries(s3_max_single_read_retries_)
, buffer_size(buffer_size_)
{
}
bool ReadBufferFromS3::nextImpl()
{
if (!initialized)
{
/// Restoring valid value of `count()` during `nextImpl()`. See `ReadBuffer::next()`.
pos = working_buffer.begin();
if (!impl)
impl = initialize();
initialized = true;
}
Stopwatch watch;
auto res = impl->next();
bool next_result = false;
for (Int64 attempt = static_cast<Int64>(s3_max_single_read_retries); attempt >= 0; --attempt)
{
if (!impl)
impl = initialize();
try
{
next_result = impl->next();
/// FIXME. 1. Poco `istream` cannot read less than buffer_size or this state is being discarded during
/// istream <-> iostream conversion. `gcount` always contains 0,
/// that's why we always have error "Cannot read from istream at offset 0".
break;
}
catch (const Exception & e)
{
ProfileEvents::increment(ProfileEvents::S3ReadRequestsErrors, 1);
LOG_INFO(log, "Caught exception while reading S3 object. Bucket: {}, Key: {}, Offset: {}, Remaining attempts: {}, Message: {}",
bucket, key, getPosition(), attempt, e.message());
impl.reset();
if (!attempt)
throw;
}
}
watch.stop();
ProfileEvents::increment(ProfileEvents::S3ReadMicroseconds, watch.elapsedMicroseconds());
if (!res)
if (!next_result)
return false;
internal_buffer = impl->buffer();
@ -60,7 +94,7 @@ bool ReadBufferFromS3::nextImpl()
off_t ReadBufferFromS3::seek(off_t offset_, int whence)
{
if (initialized)
if (impl)
throw Exception("Seek is allowed only before first read attempt from the buffer.", ErrorCodes::CANNOT_SEEK_THROUGH_FILE);
if (whence != SEEK_SET)
@ -74,7 +108,6 @@ off_t ReadBufferFromS3::seek(off_t offset_, int whence)
return offset;
}
off_t ReadBufferFromS3::getPosition()
{
return offset + count();
@ -82,13 +115,13 @@ off_t ReadBufferFromS3::getPosition()
std::unique_ptr<ReadBuffer> ReadBufferFromS3::initialize()
{
LOG_TRACE(log, "Read S3 object. Bucket: {}, Key: {}, Offset: {}", bucket, key, std::to_string(offset));
LOG_TRACE(log, "Read S3 object. Bucket: {}, Key: {}, Offset: {}", bucket, key, getPosition());
Aws::S3::Model::GetObjectRequest req;
req.SetBucket(bucket);
req.SetKey(key);
if (offset != 0)
req.SetRange("bytes=" + std::to_string(offset) + "-");
if (getPosition())
req.SetRange("bytes=" + std::to_string(getPosition()) + "-");
Aws::S3::Model::GetObjectOutcome outcome = client_ptr->GetObject(req);

View File

@ -27,8 +27,8 @@ private:
std::shared_ptr<Aws::S3::S3Client> client_ptr;
String bucket;
String key;
UInt64 s3_max_single_read_retries;
size_t buffer_size;
bool initialized = false;
off_t offset = 0;
Aws::S3::Model::GetObjectResult read_result;
std::unique_ptr<ReadBuffer> impl;
@ -40,6 +40,7 @@ public:
std::shared_ptr<Aws::S3::S3Client> client_ptr_,
const String & bucket_,
const String & key_,
UInt64 s3_max_single_read_retries_,
size_t buffer_size_ = DBMS_DEFAULT_BUFFER_SIZE);
bool nextImpl() override;

View File

@ -996,6 +996,19 @@ bool InterpreterCreateQuery::doCreateTable(ASTCreateQuery & create,
create.attach_from_path = std::nullopt;
}
if (create.attach)
{
/// If table was detached it's not possible to attach it back while some threads are using
/// old instance of the storage. For example, AsynchronousMetrics may cause ATTACH to fail,
/// so we allow waiting here. If database_atomic_wait_for_drop_and_detach_synchronously is disabled
/// and old storage instance still exists it will throw exception.
bool throw_if_table_in_use = getContext()->getSettingsRef().database_atomic_wait_for_drop_and_detach_synchronously;
if (throw_if_table_in_use)
database->checkDetachedTableNotInUse(create.uuid);
else
database->waitDetachedTableNotInUse(create.uuid);
}
StoragePtr res;
/// NOTE: CREATE query may be rewritten by Storage creator or table function
if (create.as_table_function)

View File

@ -1658,7 +1658,7 @@ void InterpreterSelectQuery::executeFetchColumns(QueryProcessingStage::Enum proc
{
/// Table.
if (max_streams == 0)
throw Exception("Logical error: zero number of streams requested", ErrorCodes::LOGICAL_ERROR);
max_streams = 1;
/// If necessary, we request more sources than the number of threads - to distribute the work evenly over the threads.
if (max_streams > 1 && !is_remote)

View File

@ -26,6 +26,7 @@
#include <Parsers/formatAST.h>
#include <IO/WriteHelpers.h>
#include <Processors/QueryPlan/CreatingSetsStep.h>
#include <DataTypes/NestedUtils.h>
namespace DB
@ -349,6 +350,35 @@ static void validateUpdateColumns(
}
}
/// Returns ASTs of updated nested subcolumns, if all of subcolumns were updated.
/// They are used to validate sizes of nested arrays.
/// If some of subcolumns were updated and some weren't,
/// it makes sense to validate only updated columns with their old versions,
/// because their sizes couldn't change, since sizes of all nested subcolumns must be consistent.
static std::optional<std::vector<ASTPtr>> getExpressionsOfUpdatedNestedSubcolumns(
const String & column_name,
const NamesAndTypesList & all_columns,
const std::unordered_map<String, ASTPtr> & column_to_update_expression)
{
std::vector<ASTPtr> res;
auto source_name = Nested::splitName(column_name).first;
/// Check this nested subcolumn
for (const auto & column : all_columns)
{
auto split = Nested::splitName(column.name);
if (isArray(column.type) && split.first == source_name && !split.second.empty())
{
auto it = column_to_update_expression.find(column.name);
if (it == column_to_update_expression.end())
return {};
res.push_back(it->second);
}
}
return res;
}
ASTPtr MutationsInterpreter::prepare(bool dry_run)
{
@ -398,7 +428,7 @@ ASTPtr MutationsInterpreter::prepare(bool dry_run)
auto dependencies = getAllColumnDependencies(metadata_snapshot, updated_columns);
/// First, break a sequence of commands into stages.
for (const auto & command : commands)
for (auto & command : commands)
{
if (command.type == MutationCommand::DELETE)
{
@ -438,12 +468,43 @@ ASTPtr MutationsInterpreter::prepare(bool dry_run)
///
/// Outer CAST is added just in case if we don't trust the returning type of 'if'.
auto type_literal = std::make_shared<ASTLiteral>(columns_desc.getPhysical(column).type->getName());
const auto & type = columns_desc.getPhysical(column).type;
auto type_literal = std::make_shared<ASTLiteral>(type->getName());
const auto & update_expr = kv.second;
ASTPtr condition = getPartitionAndPredicateExpressionForMutationCommand(command);
/// And new check validateNestedArraySizes for Nested subcolumns
if (isArray(type) && !Nested::splitName(column).second.empty())
{
std::shared_ptr<ASTFunction> function = nullptr;
auto nested_update_exprs = getExpressionsOfUpdatedNestedSubcolumns(column, all_columns, command.column_to_update_expression);
if (!nested_update_exprs)
{
function = makeASTFunction("validateNestedArraySizes",
condition,
update_expr->clone(),
std::make_shared<ASTIdentifier>(column));
condition = makeASTFunction("and", condition, function);
}
else if (nested_update_exprs->size() > 1)
{
function = std::make_shared<ASTFunction>();
function->name = "validateNestedArraySizes";
function->arguments = std::make_shared<ASTExpressionList>();
function->children.push_back(function->arguments);
function->arguments->children.push_back(condition);
for (const auto & it : *nested_update_exprs)
function->arguments->children.push_back(it->clone());
condition = makeASTFunction("and", condition, function);
}
}
auto updated_column = makeASTFunction("CAST",
makeASTFunction("if",
getPartitionAndPredicateExpressionForMutationCommand(command),
condition,
makeASTFunction("CAST",
update_expr->clone(),
type_literal),

View File

@ -201,7 +201,10 @@ void CSVRowInputFormat::readPrefix()
return;
}
else
{
skipRow(in, format_settings.csv, num_columns);
setupAllColumnsByTableSchema();
}
}
else if (!column_mapping->is_set)
setupAllColumnsByTableSchema();

View File

@ -996,8 +996,6 @@ bool TCPHandler::receivePacket()
switch (packet_type)
{
case Protocol::Client::ReadTaskResponse:
throw Exception("ReadTaskResponse must be received only after requesting in callback", ErrorCodes::LOGICAL_ERROR);
case Protocol::Client::IgnoredPartUUIDs:
/// Part uuids packet if any comes before query.
receiveIgnoredPartUUIDs();

View File

@ -9,14 +9,15 @@
#include <IO/Operators.h>
#include <common/logger_useful.h>
namespace DB
{
namespace ErrorCodes
{
extern const int BAD_ARGUMENTS;
extern const int NETWORK_ERROR;
extern const int EXCESSIVE_ELEMENT_IN_CONFIG;
extern const int NO_ELEMENTS_IN_CONFIG;
extern const int BAD_ARGUMENTS;
extern const int NETWORK_ERROR;
extern const int EXCESSIVE_ELEMENT_IN_CONFIG;
extern const int NO_ELEMENTS_IN_CONFIG;
}
const String HDFSBuilderWrapper::CONFIG_PREFIX = "hdfs";

View File

@ -17,6 +17,7 @@
namespace DB
{
namespace detail
{
struct HDFSFsDeleter
@ -28,16 +29,14 @@ namespace detail
};
}
struct HDFSFileInfo
{
hdfsFileInfo * file_info;
int length;
HDFSFileInfo()
: file_info(nullptr)
, length(0)
{
}
HDFSFileInfo() : file_info(nullptr) , length(0) {}
HDFSFileInfo(const HDFSFileInfo & other) = delete;
HDFSFileInfo(HDFSFileInfo && other) = default;
HDFSFileInfo & operator=(const HDFSFileInfo & other) = delete;
@ -49,17 +48,30 @@ struct HDFSFileInfo
}
};
class HDFSBuilderWrapper
{
hdfsBuilder * hdfs_builder;
String hadoop_kerberos_keytab;
String hadoop_kerberos_principal;
String hadoop_kerberos_kinit_command = "kinit";
String hadoop_security_kerberos_ticket_cache_path;
static std::mutex kinit_mtx;
friend HDFSBuilderWrapper createHDFSBuilder(const String & uri_str, const Poco::Util::AbstractConfiguration &);
std::vector<std::pair<String, String>> config_stor;
static const String CONFIG_PREFIX;
public:
HDFSBuilderWrapper() : hdfs_builder(hdfsNewBuilder()) {}
~HDFSBuilderWrapper() { hdfsFreeBuilder(hdfs_builder); }
HDFSBuilderWrapper(const HDFSBuilderWrapper &) = delete;
HDFSBuilderWrapper(HDFSBuilderWrapper &&) = default;
hdfsBuilder * get() { return hdfs_builder; }
private:
void loadFromConfig(const Poco::Util::AbstractConfiguration & config, const String & config_path, bool isUser = false);
String getKinitCmd();
void runKinit();
// hdfs builder relies on an external config data storage
std::pair<String, String>& keep(const String & k, const String & v)
@ -67,48 +79,24 @@ class HDFSBuilderWrapper
return config_stor.emplace_back(std::make_pair(k, v));
}
hdfsBuilder * hdfs_builder;
String hadoop_kerberos_keytab;
String hadoop_kerberos_principal;
String hadoop_kerberos_kinit_command = "kinit";
String hadoop_security_kerberos_ticket_cache_path;
static std::mutex kinit_mtx;
std::vector<std::pair<String, String>> config_stor;
bool need_kinit{false};
static const String CONFIG_PREFIX;
private:
void loadFromConfig(const Poco::Util::AbstractConfiguration & config, const String & config_path, bool isUser = false);
String getKinitCmd();
void runKinit();
public:
hdfsBuilder *
get()
{
return hdfs_builder;
}
HDFSBuilderWrapper()
: hdfs_builder(hdfsNewBuilder())
{
}
~HDFSBuilderWrapper()
{
hdfsFreeBuilder(hdfs_builder);
}
HDFSBuilderWrapper(const HDFSBuilderWrapper &) = delete;
HDFSBuilderWrapper(HDFSBuilderWrapper &&) = default;
friend HDFSBuilderWrapper createHDFSBuilder(const String & uri_str, const Poco::Util::AbstractConfiguration &);
};
using HDFSFSPtr = std::unique_ptr<std::remove_pointer_t<hdfsFS>, detail::HDFSFsDeleter>;
// set read/connect timeout, default value in libhdfs3 is about 1 hour, and too large
/// TODO Allow to tune from query Settings.
HDFSBuilderWrapper createHDFSBuilder(const String & uri_str, const Poco::Util::AbstractConfiguration &);
HDFSFSPtr createHDFSFS(hdfsBuilder * builder);
}
#endif

View File

@ -8,6 +8,7 @@
namespace DB
{
namespace ErrorCodes
{
extern const int NETWORK_ERROR;
@ -21,34 +22,39 @@ struct ReadBufferFromHDFS::ReadBufferFromHDFSImpl
/// HDFS create/open functions are not thread safe
static std::mutex hdfs_init_mutex;
std::string hdfs_uri;
String hdfs_uri;
String hdfs_file_path;
hdfsFile fin;
HDFSBuilderWrapper builder;
HDFSFSPtr fs;
ReadBufferFromHDFSImpl(const std::string & hdfs_name_,
explicit ReadBufferFromHDFSImpl(
const std::string & hdfs_uri_,
const std::string & hdfs_file_path_,
const Poco::Util::AbstractConfiguration & config_)
: hdfs_uri(hdfs_name_),
builder(createHDFSBuilder(hdfs_uri, config_))
: hdfs_uri(hdfs_uri_)
, hdfs_file_path(hdfs_file_path_)
, builder(createHDFSBuilder(hdfs_uri_, config_))
{
std::lock_guard lock(hdfs_init_mutex);
fs = createHDFSFS(builder.get());
const size_t begin_of_path = hdfs_uri.find('/', hdfs_uri.find("//") + 2);
const std::string path = hdfs_uri.substr(begin_of_path);
fin = hdfsOpenFile(fs.get(), path.c_str(), O_RDONLY, 0, 0, 0);
fin = hdfsOpenFile(fs.get(), hdfs_file_path.c_str(), O_RDONLY, 0, 0, 0);
if (fin == nullptr)
throw Exception("Unable to open HDFS file: " + path + " error: " + std::string(hdfsGetLastError()),
ErrorCodes::CANNOT_OPEN_FILE);
throw Exception(ErrorCodes::CANNOT_OPEN_FILE,
"Unable to open HDFS file: {}. Error: {}",
hdfs_uri + hdfs_file_path, std::string(hdfsGetLastError()));
}
int read(char * start, size_t size) const
{
int bytes_read = hdfsRead(fs.get(), fin, start, size);
if (bytes_read < 0)
throw Exception("Fail to read HDFS file: " + hdfs_uri + " " + std::string(hdfsGetLastError()),
ErrorCodes::NETWORK_ERROR);
throw Exception(ErrorCodes::NETWORK_ERROR,
"Fail to read from HDFS: {}, file path: {}. Error: {}",
hdfs_uri, hdfs_file_path, std::string(hdfsGetLastError()));
return bytes_read;
}
@ -62,11 +68,13 @@ struct ReadBufferFromHDFS::ReadBufferFromHDFSImpl
std::mutex ReadBufferFromHDFS::ReadBufferFromHDFSImpl::hdfs_init_mutex;
ReadBufferFromHDFS::ReadBufferFromHDFS(const std::string & hdfs_name_,
const Poco::Util::AbstractConfiguration & config_,
size_t buf_size_)
ReadBufferFromHDFS::ReadBufferFromHDFS(
const String & hdfs_uri_,
const String & hdfs_file_path_,
const Poco::Util::AbstractConfiguration & config_,
size_t buf_size_)
: BufferWithOwnMemory<ReadBuffer>(buf_size_)
, impl(std::make_unique<ReadBufferFromHDFSImpl>(hdfs_name_, config_))
, impl(std::make_unique<ReadBufferFromHDFSImpl>(hdfs_uri_, hdfs_file_path_, config_))
{
}

View File

@ -7,11 +7,8 @@
#include <IO/BufferWithOwnMemory.h>
#include <string>
#include <memory>
#include <hdfs/hdfs.h>
#include <common/types.h>
#include <Interpreters/Context.h>
@ -22,13 +19,19 @@ namespace DB
*/
class ReadBufferFromHDFS : public BufferWithOwnMemory<ReadBuffer>
{
struct ReadBufferFromHDFSImpl;
std::unique_ptr<ReadBufferFromHDFSImpl> impl;
struct ReadBufferFromHDFSImpl;
public:
ReadBufferFromHDFS(const std::string & hdfs_name_, const Poco::Util::AbstractConfiguration &, size_t buf_size_ = DBMS_DEFAULT_BUFFER_SIZE);
ReadBufferFromHDFS(const String & hdfs_uri_, const String & hdfs_file_path_,
const Poco::Util::AbstractConfiguration &, size_t buf_size_ = DBMS_DEFAULT_BUFFER_SIZE);
~ReadBufferFromHDFS() override;
bool nextImpl() override;
private:
std::unique_ptr<ReadBufferFromHDFSImpl> impl;
};
}
#endif

View File

@ -122,7 +122,7 @@ public:
current_path = uri + path;
auto compression = chooseCompressionMethod(path, compression_method);
auto read_buf = wrapReadBufferWithCompressionMethod(std::make_unique<ReadBufferFromHDFS>(current_path, getContext()->getGlobalContext()->getConfigRef()), compression);
auto read_buf = wrapReadBufferWithCompressionMethod(std::make_unique<ReadBufferFromHDFS>(uri, path, getContext()->getGlobalContext()->getConfigRef()), compression);
auto input_format = FormatFactory::instance().getInput(format, *read_buf, sample_block, getContext(), max_block_size);
auto input_stream = std::make_shared<InputStreamFromInputFormat>(input_format);
@ -271,7 +271,15 @@ Pipe StorageHDFS::read(
size_t max_block_size,
unsigned num_streams)
{
const size_t begin_of_path = uri.find('/', uri.find("//") + 2);
size_t begin_of_path;
/// This uri is checked for correctness in constructor of StorageHDFS and never modified afterwards
auto two_slash = uri.find("//");
if (two_slash == std::string::npos)
begin_of_path = uri.find('/');
else
begin_of_path = uri.find('/', two_slash + 2);
const String path_from_uri = uri.substr(begin_of_path);
const String uri_without_path = uri.substr(0, begin_of_path);
@ -281,6 +289,9 @@ Pipe StorageHDFS::read(
auto sources_info = std::make_shared<HDFSSource::SourcesInfo>();
sources_info->uris = LSWithRegexpMatching("/", fs, path_from_uri);
if (sources_info->uris.empty())
LOG_WARNING(log, "No file in HDFS matches the path: {}", uri);
for (const auto & column : column_names)
{
if (column == "_path")

View File

@ -42,7 +42,7 @@ protected:
const String & compression_method_);
private:
String uri;
const String uri;
String format_name;
String compression_method;

View File

@ -3,6 +3,7 @@
#include <Compression/CompressionFactory.h>
#include <Compression/CompressedReadBufferFromFile.h>
#include <DataTypes/Serializations/ISerialization.h>
#include <Common/escapeForFileName.h>
namespace DB
{
@ -393,8 +394,9 @@ void MergeTreeDataPartWriterWide::validateColumnOfFixedSize(const String & name,
throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot validate column of non fixed type {}", type.getName());
auto disk = data_part->volume->getDisk();
String mrk_path = fullPath(disk, part_path + name + marks_file_extension);
String bin_path = fullPath(disk, part_path + name + DATA_FILE_EXTENSION);
String escaped_name = escapeForFileName(name);
String mrk_path = fullPath(disk, part_path + escaped_name + marks_file_extension);
String bin_path = fullPath(disk, part_path + escaped_name + DATA_FILE_EXTENSION);
DB::ReadBufferFromFile mrk_in(mrk_path);
DB::CompressedReadBufferFromFile bin_in(bin_path, 0, 0, 0, nullptr);
bool must_be_last = false;

View File

@ -92,7 +92,7 @@ Pipe StorageMaterializeMySQL::read(
{
Block pipe_header = pipe.getHeader();
auto syntax = TreeRewriter(context).analyze(expressions, pipe_header.getNamesAndTypesList());
ExpressionActionsPtr expression_actions = ExpressionAnalyzer(expressions, syntax, context).getActions(true);
ExpressionActionsPtr expression_actions = ExpressionAnalyzer(expressions, syntax, context).getActions(true /* add_aliases */, false /* project_result */);
pipe.addSimpleTransform([&](const Block & header)
{

View File

@ -146,7 +146,9 @@ public:
{
WriteBufferFromOwnString sqlbuf;
sqlbuf << (storage.replace_query ? "REPLACE" : "INSERT") << " INTO ";
sqlbuf << backQuoteMySQL(remote_database_name) << "." << backQuoteMySQL(remote_table_name);
if (!remote_database_name.empty())
sqlbuf << backQuoteMySQL(remote_database_name) << ".";
sqlbuf << backQuoteMySQL(remote_table_name);
sqlbuf << " (" << dumpNamesWithBackQuote(block) << ") VALUES ";
auto writer = FormatFactory::instance().getOutputStream("Values", sqlbuf, metadata_snapshot->getSampleBlock(), storage.getContext());

View File

@ -166,6 +166,7 @@ StorageS3Source::StorageS3Source(
ContextPtr context_,
const ColumnsDescription & columns_,
UInt64 max_block_size_,
UInt64 s3_max_single_read_retries_,
const String compression_hint_,
const std::shared_ptr<Aws::S3::S3Client> & client_,
const String & bucket_,
@ -177,6 +178,7 @@ StorageS3Source::StorageS3Source(
, format(format_)
, columns_desc(columns_)
, max_block_size(max_block_size_)
, s3_max_single_read_retries(s3_max_single_read_retries_)
, compression_hint(compression_hint_)
, client(client_)
, sample_block(sample_block_)
@ -197,7 +199,7 @@ bool StorageS3Source::initialize()
file_path = bucket + "/" + current_key;
read_buf = wrapReadBufferWithCompressionMethod(
std::make_unique<ReadBufferFromS3>(client, bucket, current_key), chooseCompressionMethod(current_key, compression_hint));
std::make_unique<ReadBufferFromS3>(client, bucket, current_key, s3_max_single_read_retries), chooseCompressionMethod(current_key, compression_hint));
auto input_format = FormatFactory::instance().getInput(format, *read_buf, sample_block, getContext(), max_block_size);
reader = std::make_shared<InputStreamFromInputFormat>(input_format);
@ -312,6 +314,7 @@ StorageS3::StorageS3(
const String & secret_access_key_,
const StorageID & table_id_,
const String & format_name_,
UInt64 s3_max_single_read_retries_,
UInt64 min_upload_part_size_,
UInt64 max_single_part_upload_size_,
UInt64 max_connections_,
@ -323,6 +326,7 @@ StorageS3::StorageS3(
: IStorage(table_id_)
, client_auth{uri_, access_key_id_, secret_access_key_, max_connections_, {}, {}} /// Client and settings will be updated later
, format_name(format_name_)
, s3_max_single_read_retries(s3_max_single_read_retries_)
, min_upload_part_size(min_upload_part_size_)
, max_single_part_upload_size(max_single_part_upload_size_)
, compression_method(compression_method_)
@ -389,6 +393,7 @@ Pipe StorageS3::read(
local_context,
metadata_snapshot->getColumns(),
max_block_size,
s3_max_single_read_retries,
compression_method,
client_auth.client,
client_auth.uri.bucket,
@ -474,6 +479,7 @@ void registerStorageS3Impl(const String & name, StorageFactory & factory)
secret_access_key = engine_args[2]->as<ASTLiteral &>().value.safeGet<String>();
}
UInt64 s3_max_single_read_retries = args.getLocalContext()->getSettingsRef().s3_max_single_read_retries;
UInt64 min_upload_part_size = args.getLocalContext()->getSettingsRef().s3_min_upload_part_size;
UInt64 max_single_part_upload_size = args.getLocalContext()->getSettingsRef().s3_max_single_part_upload_size;
UInt64 max_connections = args.getLocalContext()->getSettingsRef().s3_max_connections;
@ -497,6 +503,7 @@ void registerStorageS3Impl(const String & name, StorageFactory & factory)
secret_access_key,
args.table_id,
format_name,
s3_max_single_read_retries,
min_upload_part_size,
max_single_part_upload_size,
max_connections,

View File

@ -55,6 +55,7 @@ public:
ContextPtr context_,
const ColumnsDescription & columns_,
UInt64 max_block_size_,
UInt64 s3_max_single_read_retries_,
const String compression_hint_,
const std::shared_ptr<Aws::S3::S3Client> & client_,
const String & bucket,
@ -71,6 +72,7 @@ private:
String format;
ColumnsDescription columns_desc;
UInt64 max_block_size;
UInt64 s3_max_single_read_retries;
String compression_hint;
std::shared_ptr<Aws::S3::S3Client> client;
Block sample_block;
@ -100,6 +102,7 @@ public:
const String & secret_access_key,
const StorageID & table_id_,
const String & format_name_,
UInt64 s3_max_single_read_retries_,
UInt64 min_upload_part_size_,
UInt64 max_single_part_upload_size_,
UInt64 max_connections_,
@ -145,6 +148,7 @@ private:
ClientAuthentificaiton client_auth;
String format_name;
UInt64 s3_max_single_read_retries;
size_t min_upload_part_size;
size_t max_single_part_upload_size;
String compression_method;

View File

@ -83,12 +83,17 @@ ColumnsDescription TableFunctionMySQL::getActualTableStructure(ContextPtr contex
const auto columns = tables_and_columns.find(remote_table_name);
if (columns == tables_and_columns.end())
throw Exception("MySQL table " + backQuoteIfNeed(remote_database_name) + "." + backQuoteIfNeed(remote_table_name) + " doesn't exist.", ErrorCodes::UNKNOWN_TABLE);
throw Exception("MySQL table " + (remote_database_name.empty() ? "" : (backQuote(remote_database_name) + "."))
+ backQuote(remote_table_name) + " doesn't exist.", ErrorCodes::UNKNOWN_TABLE);
return ColumnsDescription{columns->second};
}
StoragePtr TableFunctionMySQL::executeImpl(const ASTPtr & /*ast_function*/, ContextPtr context, const std::string & table_name, ColumnsDescription /*cached_columns*/) const
StoragePtr TableFunctionMySQL::executeImpl(
const ASTPtr & /*ast_function*/,
ContextPtr context,
const std::string & table_name,
ColumnsDescription /*cached_columns*/) const
{
auto columns = getActualTableStructure(context);

View File

@ -83,6 +83,7 @@ StoragePtr TableFunctionS3::executeImpl(const ASTPtr & /*ast_function*/, Context
{
Poco::URI uri (filename);
S3::URI s3_uri (uri);
UInt64 s3_max_single_read_retries = context->getSettingsRef().s3_max_single_read_retries;
UInt64 min_upload_part_size = context->getSettingsRef().s3_min_upload_part_size;
UInt64 max_single_part_upload_size = context->getSettingsRef().s3_max_single_part_upload_size;
UInt64 max_connections = context->getSettingsRef().s3_max_connections;
@ -93,6 +94,7 @@ StoragePtr TableFunctionS3::executeImpl(const ASTPtr & /*ast_function*/, Context
secret_access_key,
StorageID(getDatabaseName(), table_name),
format,
s3_max_single_read_retries,
min_upload_part_size,
max_single_part_upload_size,
max_connections,

View File

@ -109,12 +109,17 @@ StoragePtr TableFunctionS3Cluster::executeImpl(
Poco::URI uri (filename);
S3::URI s3_uri (uri);
/// Actually this parameters are not used
UInt64 s3_max_single_read_retries = context->getSettingsRef().s3_max_single_read_retries;
UInt64 min_upload_part_size = context->getSettingsRef().s3_min_upload_part_size;
UInt64 max_single_part_upload_size = context->getSettingsRef().s3_max_single_part_upload_size;
UInt64 max_connections = context->getSettingsRef().s3_max_connections;
storage = StorageS3::create(
s3_uri, access_key_id, secret_access_key, StorageID(getDatabaseName(), table_name),
format, min_upload_part_size, max_single_part_upload_size, max_connections,
format,
s3_max_single_read_retries,
min_upload_part_size,
max_single_part_upload_size,
max_connections,
getActualTableStructure(context), ConstraintsDescription{},
context, compression_method, /*distributed_processing=*/true);
}

View File

@ -843,6 +843,17 @@ def system_tables_test(clickhouse_node, mysql_node, service_name):
clickhouse_node.query("CREATE DATABASE system_tables_test ENGINE = MaterializeMySQL('{}:3306', 'system_tables_test', 'root', 'clickhouse')".format(service_name))
check_query(clickhouse_node, "SELECT partition_key, sorting_key, primary_key FROM system.tables WHERE database = 'system_tables_test' AND name = 'test'", "intDiv(id, 4294967)\tid\tid\n")
def move_to_prewhere_and_column_filtering(clickhouse_node, mysql_node, service_name):
clickhouse_node.query("DROP DATABASE IF EXISTS cond_on_key_col")
mysql_node.query("DROP DATABASE IF EXISTS cond_on_key_col")
mysql_node.query("CREATE DATABASE cond_on_key_col")
clickhouse_node.query("CREATE DATABASE cond_on_key_col ENGINE = MaterializeMySQL('{}:3306', 'cond_on_key_col', 'root', 'clickhouse')".format(service_name))
mysql_node.query("create table cond_on_key_col.products (id int primary key, product_id int not null, catalog_id int not null, brand_id int not null, name text)")
mysql_node.query("insert into cond_on_key_col.products (id, name, catalog_id, brand_id, product_id) values (915, 'ertyui', 5287, 15837, 0), (990, 'wer', 1053, 24390, 1), (781, 'qwerty', 1041, 1176, 2);")
check_query(clickhouse_node, "SELECT DISTINCT P.id, P.name, P.catalog_id FROM cond_on_key_col.products P WHERE P.name ILIKE '%e%' and P.catalog_id=5287", '915\tertyui\t5287\n')
clickhouse_node.query("DROP DATABASE cond_on_key_col")
mysql_node.query("DROP DATABASE cond_on_key_col")
def mysql_settings_test(clickhouse_node, mysql_node, service_name):
mysql_node.query("DROP DATABASE IF EXISTS test_database")
clickhouse_node.query("DROP DATABASE IF EXISTS test_database")
@ -858,3 +869,4 @@ def mysql_settings_test(clickhouse_node, mysql_node, service_name):
clickhouse_node.query("DROP DATABASE test_database")
mysql_node.query("DROP DATABASE test_database")

View File

@ -153,6 +153,7 @@ def test_materialize_database_dml_with_mysql_5_7(started_cluster, started_mysql_
materialize_with_ddl.dml_with_materialize_mysql_database(clickhouse_node, started_mysql_5_7, "mysql1")
materialize_with_ddl.materialize_mysql_database_with_views(clickhouse_node, started_mysql_5_7, "mysql1")
materialize_with_ddl.materialize_mysql_database_with_datetime_and_decimal(clickhouse_node, started_mysql_5_7, "mysql1")
materialize_with_ddl.move_to_prewhere_and_column_filtering(clickhouse_node, started_mysql_5_7, "mysql1")
@pytest.mark.parametrize(('clickhouse_node'), [node_db_ordinary, node_db_atomic])
@ -160,6 +161,7 @@ def test_materialize_database_dml_with_mysql_8_0(started_cluster, started_mysql_
materialize_with_ddl.dml_with_materialize_mysql_database(clickhouse_node, started_mysql_8_0, "mysql8_0")
materialize_with_ddl.materialize_mysql_database_with_views(clickhouse_node, started_mysql_8_0, "mysql8_0")
materialize_with_ddl.materialize_mysql_database_with_datetime_and_decimal(clickhouse_node, started_mysql_8_0, "mysql8_0")
materialize_with_ddl.move_to_prewhere_and_column_filtering(clickhouse_node, started_mysql_8_0, "mysql8_0")
@pytest.mark.parametrize(('clickhouse_node'), [node_db_ordinary, node_db_atomic])

View File

@ -210,6 +210,14 @@ def test_virtual_columns(started_cluster):
expected = "1\tfile1\thdfs://hdfs1:9000//file1\n2\tfile2\thdfs://hdfs1:9000//file2\n3\tfile3\thdfs://hdfs1:9000//file3\n"
assert node1.query("select id, _file as file_name, _path as file_path from virtual_cols order by id") == expected
def test_read_files_with_spaces(started_cluster):
started_cluster.hdfs_api.write_data("/test test test 1.txt", "1\n")
started_cluster.hdfs_api.write_data("/test test test 2.txt", "2\n")
started_cluster.hdfs_api.write_data("/test test test 3.txt", "3\n")
node1.query("create table test (id UInt32) ENGINE = HDFS('hdfs://hdfs1:9000/test*', 'TSV')")
assert node1.query("select * from test order by id") == "1\n2\n3\n"
if __name__ == '__main__':
cluster.start()

View File

@ -1,3 +1,5 @@
import sys
from bottle import abort, route, run, request, response
@ -21,4 +23,4 @@ def ping():
return 'OK'
run(host='0.0.0.0', port=8080)
run(host='0.0.0.0', port=int(sys.argv[1]))

View File

@ -0,0 +1,90 @@
import http.server
import random
import re
import socket
import struct
import sys
def gen_n_digit_number(n):
assert 0 < n < 19
return random.randint(10**(n-1), 10**n-1)
def gen_line():
columns = 4
row = []
def add_number():
digits = random.randint(1, 18)
row.append(gen_n_digit_number(digits))
for i in range(columns // 2):
add_number()
row.append(1)
for i in range(columns - 1 - columns // 2):
add_number()
line = ",".join(map(str, row)) + "\n"
return line.encode()
random.seed("Unstable server/1.0")
lines = b"".join((gen_line() for _ in range(500000)))
class RequestHandler(http.server.BaseHTTPRequestHandler):
def do_HEAD(self):
if self.path == "/root/test.csv":
self.from_bytes = 0
self.end_bytes = len(lines)
self.size = self.end_bytes
self.send_block_size = 256
self.stop_at = random.randint(900000, 1200000) // self.send_block_size # Block size is 1024**2.
if "Range" in self.headers:
cr = self.headers["Range"]
parts = re.split("[ -/=]+", cr)
assert parts[0] == "bytes"
self.from_bytes = int(parts[1])
if parts[2]:
self.end_bytes = int(parts[2])+1
self.send_response(206)
self.send_header("Content-Range", f"bytes {self.from_bytes}-{self.end_bytes-1}/{self.size}")
else:
self.send_response(200)
self.send_header("Accept-Ranges", "bytes")
self.send_header("Content-Type", "text/plain")
self.send_header("Content-Length", f"{self.end_bytes-self.from_bytes}")
self.end_headers()
elif self.path == "/":
self.send_response(200)
self.send_header("Content-Type", "text/plain")
self.end_headers()
else:
self.send_response(404)
self.send_header("Content-Type", "text/plain")
self.end_headers()
def do_GET(self):
self.do_HEAD()
if self.path == "/root/test.csv":
for c, i in enumerate(range(self.from_bytes, self.end_bytes, self.send_block_size)):
self.wfile.write(lines[i:min(i+self.send_block_size, self.end_bytes)])
if (c + 1) % self.stop_at == 0:
#self.wfile._sock.setsockopt(socket.SOL_SOCKET, socket.SO_LINGER, struct.pack("ii", 0, 0))
#self.wfile._sock.shutdown(socket.SHUT_RDWR)
#self.wfile._sock.close()
print('Dropping connection')
break
elif self.path == "/":
self.wfile.write(b"OK")
httpd = http.server.HTTPServer(("0.0.0.0", int(sys.argv[1])), RequestHandler)
httpd.serve_forever()

View File

@ -96,7 +96,7 @@ def cluster():
prepare_s3_bucket(cluster)
logging.info("S3 bucket created")
run_s3_mock(cluster)
run_s3_mocks(cluster)
yield cluster
finally:
@ -384,26 +384,32 @@ def test_s3_glob_scheherazade(cluster):
assert run_query(instance, query).splitlines() == ["1001\t1001\t1001\t1001"]
def run_s3_mock(cluster):
logging.info("Starting s3 mock")
container_id = cluster.get_container_id('resolver')
current_dir = os.path.dirname(__file__)
cluster.copy_file_to_container(container_id, os.path.join(current_dir, "s3_mock", "mock_s3.py"), "mock_s3.py")
cluster.exec_in_container(container_id, ["python", "mock_s3.py"], detach=True)
def run_s3_mocks(cluster):
logging.info("Starting s3 mocks")
mocks = (
("mock_s3.py", "resolver", "8080"),
("unstable_server.py", "resolver", "8081"),
)
for mock_filename, container, port in mocks:
container_id = cluster.get_container_id(container)
current_dir = os.path.dirname(__file__)
cluster.copy_file_to_container(container_id, os.path.join(current_dir, "s3_mocks", mock_filename), mock_filename)
cluster.exec_in_container(container_id, ["python", mock_filename, port], detach=True)
# Wait for S3 mock start
for attempt in range(10):
ping_response = cluster.exec_in_container(cluster.get_container_id('resolver'),
["curl", "-s", "http://resolver:8080/"], nothrow=True)
if ping_response != 'OK':
if attempt == 9:
assert ping_response == 'OK', 'Expected "OK", but got "{}"'.format(ping_response)
# Wait for S3 mocks to start
for mock_filename, container, port in mocks:
for attempt in range(10):
ping_response = cluster.exec_in_container(cluster.get_container_id(container),
["curl", "-s", f"http://{container}:{port}/"], nothrow=True)
if ping_response != 'OK':
if attempt == 9:
assert ping_response == 'OK', 'Expected "OK", but got "{}"'.format(ping_response)
else:
time.sleep(1)
else:
time.sleep(1)
else:
break
break
logging.info("S3 mock started")
logging.info("S3 mocks started")
def replace_config(old, new):
@ -523,6 +529,15 @@ def test_storage_s3_get_gzip(cluster, extension, method):
run_query(instance, f"DROP TABLE {name}")
def test_storage_s3_get_unstable(cluster):
bucket = cluster.minio_bucket
instance = cluster.instances["dummy"]
table_format = "column1 Int64, column2 Int64, column3 Int64, column4 Int64"
get_query = f"SELECT count(), sum(column3) FROM s3('http://resolver:8081/{cluster.minio_bucket}/test.csv', 'CSV', '{table_format}') FORMAT CSV"
result = run_query(instance, get_query)
assert result.splitlines() == ["500000,500000"]
def test_storage_s3_put_uncompressed(cluster):
bucket = cluster.minio_bucket
instance = cluster.instances["dummy"]

View File

@ -4,13 +4,31 @@ CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CURDIR"/../shell_config.sh
common_opts=(
"--format=Null"
# We check that even if max_threads is small, the setting max_distributed_connections
# will allow to process queries on multiple shards concurrently.
"--max_threads=1"
"--max_distributed_connections=3"
)
# We do sleep 1.5 seconds on ten machines.
# If concurrency is one (bad) the query will take at least 15 seconds and the following loops are guaranteed to be infinite.
# If concurrency is 10 (good), the query may take less than 10 second with non-zero probability
# and the following loops will finish with probability 1 assuming independent random variables.
# NOTE: the test use higher timeout to avoid flakiness.
timeout 9s ${CLICKHOUSE_CLIENT} "$@" "${common_opts[@]}" -q "select sleep(3) from remote('127.{1,2,3,4,5}', system.one)" --prefer_localhost_replica=0
timeout 9s ${CLICKHOUSE_CLIENT} "$@" "${common_opts[@]}" -q "select sleep(3) from remote('127.{1,2,3,4,5}', system.one)" --prefer_localhost_replica=1
while true; do
timeout 10 ${CLICKHOUSE_CLIENT} --max_threads 1 --max_distributed_connections 10 --query "
SELECT sleep(1.5) FROM remote('127.{1..10}', system.one) FORMAT Null" --prefer_localhost_replica=0 && break
done
while true; do
timeout 10 ${CLICKHOUSE_CLIENT} --max_threads 1 --max_distributed_connections 10 --query "
SELECT sleep(1.5) FROM remote('127.{1..10}', system.one) FORMAT Null" --prefer_localhost_replica=1 && break
done
# If max_distributed_connections is low and async_socket_for_remote is disabled,
# the concurrency of distributed queries will be also low.
timeout 1 ${CLICKHOUSE_CLIENT} --max_threads 1 --max_distributed_connections 1 --async_socket_for_remote 0 --query "
SELECT sleep(0.15) FROM remote('127.{1..10}', system.one) FORMAT Null" --prefer_localhost_replica=0 && echo 'Fail'
timeout 1 ${CLICKHOUSE_CLIENT} --max_threads 1 --max_distributed_connections 1 --async_socket_for_remote 0 --query "
SELECT sleep(0.15) FROM remote('127.{1..10}', system.one) FORMAT Null" --prefer_localhost_replica=1 && echo 'Fail'
echo 'Ok'

View File

@ -15,13 +15,13 @@ drop table if exists simple;
create table simple (i int, j int) engine = MergeTree order by i
settings index_granularity = 1, max_concurrent_queries = 1, min_marks_to_honor_max_concurrent_queries = 2;
insert into simple select number, number + 100 from numbers(1000);
insert into simple select number, number + 100 from numbers(5000);
"
query_id="long_running_query-$CLICKHOUSE_DATABASE"
echo "Spin up a long running query"
${CLICKHOUSE_CLIENT} --query "select sleepEachRow(0.01) from simple settings max_block_size = 1 format Null" --query_id "$query_id" > /dev/null 2>&1 &
${CLICKHOUSE_CLIENT} --query "select sleepEachRow(0.1) from simple settings max_block_size = 1 format Null" --query_id "$query_id" > /dev/null 2>&1 &
wait_for_query_to_start "$query_id"
# query which reads marks >= min_marks_to_honor_max_concurrent_queries is throttled

View File

@ -0,0 +1,21 @@
1 [100,200] ['aa','bb'] [1,2]
0 [0,1] ['aa','bb'] [0,0]
1 [100,200] ['aa','bb'] [1,2]
2 [100,200,300] ['a','b','c'] [10,20,30]
3 [3,4] ['aa','bb'] [3,6]
4 [4,5] ['aa','bb'] [4,8]
0 [0,1] ['aa','bb'] [0,0]
1 [100,200] ['aa','bb'] [1,2]
2 [100,200,300] ['a','b','c'] [100,200,300]
3 [3,4] ['aa','bb'] [3,6]
4 [4,5] ['aa','bb'] [4,8]
0 [0,1] ['aa','bb'] [0,0]
1 [100,200] ['aa','bb'] [1,2]
2 [100,200,300] ['a','b','c'] [100,200,300]
3 [68,72] ['aa','bb'] [68,72]
4 [4,5] ['aa','bb'] [4,8]
0 0 aa 0
1 1 bb 2
2 2 aa 4
3 3 aa 6
4 4 aa 8

View File

@ -0,0 +1,70 @@
DROP TABLE IF EXISTS test_wide_nested;
CREATE TABLE test_wide_nested
(
`id` Int,
`info.id` Array(Int),
`info.name` Array(String),
`info.age` Array(Int)
)
ENGINE = MergeTree
ORDER BY tuple()
SETTINGS min_bytes_for_wide_part = 0;
set mutations_sync = 1;
INSERT INTO test_wide_nested SELECT number, [number,number + 1], ['aa','bb'], [number,number * 2] FROM numbers(5);
alter table test_wide_nested update `info.id` = [100,200] where id = 1;
select * from test_wide_nested where id = 1 order by id;
alter table test_wide_nested update `info.id` = [100,200,300], `info.age` = [10,20,30], `info.name` = ['a','b','c'] where id = 2;
select * from test_wide_nested;
alter table test_wide_nested update `info.id` = [100,200,300], `info.age` = `info.id`, `info.name` = ['a','b','c'] where id = 2;
select * from test_wide_nested;
alter table test_wide_nested update `info.id` = [100,200], `info.age`=[68,72] where id = 3;
alter table test_wide_nested update `info.id` = `info.age` where id = 3;
select * from test_wide_nested;
alter table test_wide_nested update `info.id` = [100,200], `info.age` = [10,20,30], `info.name` = ['a','b','c'] where id = 0; -- { serverError 341 }
-- Recreate table, because KILL MUTATION is not suitable for parallel tests execution.
DROP TABLE test_wide_nested;
CREATE TABLE test_wide_nested
(
`id` Int,
`info.id` Array(Int),
`info.name` Array(String),
`info.age` Array(Int)
)
ENGINE = MergeTree
ORDER BY tuple()
SETTINGS min_bytes_for_wide_part = 0;
INSERT INTO test_wide_nested SELECT number, [number,number + 1], ['aa','bb'], [number,number * 2] FROM numbers(5);
alter table test_wide_nested update `info.id` = [100,200,300], `info.age` = [10,20,30] where id = 1; -- { serverError 341 }
DROP TABLE test_wide_nested;
DROP TABLE IF EXISTS test_wide_not_nested;
CREATE TABLE test_wide_not_nested
(
`id` Int,
`info.id` Int,
`info.name` String,
`info.age` Int
)
ENGINE = MergeTree
ORDER BY tuple()
SETTINGS min_bytes_for_wide_part = 0;
INSERT INTO test_wide_not_nested SELECT number, number, 'aa', number * 2 FROM numbers(5);
ALTER TABLE test_wide_not_nested UPDATE `info.name` = 'bb' WHERE id = 1;
SELECT * FROM test_wide_not_nested ORDER BY id;
DROP TABLE test_wide_not_nested;

View File

@ -0,0 +1,2 @@
testdata1
testdata2

View File

@ -0,0 +1,15 @@
#!/usr/bin/env bash
CUR_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CUR_DIR"/../shell_config.sh
${CLICKHOUSE_CLIENT} -q "DROP TABLE IF EXISTS \`01818_with_names\`;"
${CLICKHOUSE_CLIENT} -q "CREATE TABLE \`01818_with_names\` (t String) ENGINE = MergeTree ORDER BY t;"
echo -ne "t\ntestdata1\ntestdata2" | ${CLICKHOUSE_CLIENT} --input_format_with_names_use_header 0 --query "INSERT INTO \`01818_with_names\` FORMAT CSVWithNames"
${CLICKHOUSE_CLIENT} -q "SELECT * FROM \`01818_with_names\`;"
${CLICKHOUSE_CLIENT} -q "DROP TABLE IF EXISTS \`01818_with_names\`;"

View File

@ -0,0 +1 @@
[['a'],['b','c']]

View File

@ -0,0 +1,7 @@
create temporary table test (
arr Array(Array(LowCardinality(String)))
);
insert into test(arr) values ([['a'], ['b', 'c']]);
select arrayFilter(x -> 1, arr) from test;

View File

@ -0,0 +1 @@
0

View File

@ -0,0 +1 @@
select * from remote('127.1', system.one) settings max_distributed_connections=0;