Merge pull request #7085 from ClickHouse/merge-s3

Merge s3
This commit is contained in:
alexey-milovidov 2019-09-24 17:51:57 +03:00 committed by GitHub
commit bc9667d881
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 19 additions and 14 deletions

View File

@ -68,6 +68,7 @@ struct Settings : public SettingsCollection<Settings>
M(SettingUInt64, idle_connection_timeout, 3600, "Close idle TCP connections after specified number of seconds.") \
M(SettingUInt64, distributed_connections_pool_size, DBMS_DEFAULT_DISTRIBUTED_CONNECTIONS_POOL_SIZE, "Maximum number of connections with one remote server in the pool.") \
M(SettingUInt64, connections_with_failover_max_tries, DBMS_CONNECTION_POOL_WITH_FAILOVER_DEFAULT_MAX_TRIES, "The maximum number of attempts to connect to replicas.") \
M(SettingUInt64, s3_min_upload_part_size, 512*1024*1024, "The mininum size of part to upload during multipart upload to S3.") \
M(SettingBool, extremes, false, "Calculate minimums and maximums of the result columns. They can be output in JSON-formats.") \
M(SettingBool, use_uncompressed_cache, true, "Whether to use the cache of uncompressed blocks.") \
M(SettingBool, replace_running_query, false, "Whether the running request should be canceled with the same id as the new one.") \

View File

@ -79,13 +79,13 @@ namespace
public:
StorageS3BlockOutputStream(const Poco::URI & uri,
const String & format,
UInt64 min_upload_part_size,
const Block & sample_block_,
const Context & context,
const ConnectionTimeouts & timeouts)
: sample_block(sample_block_)
{
auto minimum_upload_part_size = context.getConfigRef().getUInt64("s3.minimum_upload_part_size", 512 * 1024 * 1024);
write_buf = std::make_unique<WriteBufferFromS3>(uri, minimum_upload_part_size, timeouts);
write_buf = std::make_unique<WriteBufferFromS3>(uri, min_upload_part_size, timeouts);
writer = FormatFactory::instance().getOutput(format, *write_buf, sample_block, context);
}
@ -124,6 +124,7 @@ StorageS3::StorageS3(
const std::string & database_name_,
const std::string & table_name_,
const String & format_name_,
UInt64 min_upload_part_size_,
const ColumnsDescription & columns_,
const ConstraintsDescription & constraints_,
Context & context_)
@ -133,6 +134,7 @@ StorageS3::StorageS3(
, format_name(format_name_)
, database_name(database_name_)
, table_name(table_name_)
, min_upload_part_size(min_upload_part_size_)
{
setColumns(columns_);
setConstraints(constraints_);
@ -171,7 +173,7 @@ void StorageS3::rename(const String & /*new_path_to_db*/, const String & new_dat
BlockOutputStreamPtr StorageS3::write(const ASTPtr & /*query*/, const Context & /*context*/)
{
return std::make_shared<StorageS3BlockOutputStream>(
uri, format_name, getSampleBlock(), context_global, ConnectionTimeouts::getHTTPTimeouts(context_global));
uri, format_name, min_upload_part_size, getSampleBlock(), context_global, ConnectionTimeouts::getHTTPTimeouts(context_global));
}
void registerStorageS3(StorageFactory & factory)
@ -193,7 +195,9 @@ void registerStorageS3(StorageFactory & factory)
String format_name = engine_args[1]->as<ASTLiteral &>().value.safeGet<String>();
return StorageS3::create(uri, args.database_name, args.table_name, format_name, args.columns, args.constraints, args.context);
UInt64 min_upload_part_size = args.local_context.getSettingsRef().s3_min_upload_part_size;
return StorageS3::create(uri, args.database_name, args.table_name, format_name, min_upload_part_size, args.columns, args.constraints, args.context);
});
}
}

View File

@ -21,6 +21,7 @@ public:
const std::string & database_name_,
const std::string & table_name_,
const String & format_name_,
UInt64 min_upload_part_size_,
const ColumnsDescription & columns_,
const ConstraintsDescription & constraints_,
Context & context_);
@ -59,6 +60,7 @@ private:
String format_name;
String database_name;
String table_name;
UInt64 min_upload_part_size;
};
}

View File

@ -10,7 +10,8 @@ StoragePtr TableFunctionS3::getStorage(
const String & source, const String & format, const ColumnsDescription & columns, Context & global_context, const std::string & table_name) const
{
Poco::URI uri(source);
return StorageS3::create(uri, getDatabaseName(), table_name, format, columns, ConstraintsDescription{}, global_context);
UInt64 min_upload_part_size = global_context.getSettingsRef().s3_min_upload_part_size;
return StorageS3::create(uri, getDatabaseName(), table_name, format, min_upload_part_size, columns, ConstraintsDescription{}, global_context);
}
void registerTableFunctionS3(TableFunctionFactory & factory)

View File

@ -1,5 +0,0 @@
<yandex>
<s3>
<minimum_upload_part_size>1000000</minimum_upload_part_size>
</s3>
</yandex>

View File

@ -34,7 +34,7 @@ def put_communication_data(started_cluster, body):
def started_cluster():
try:
cluster = ClickHouseCluster(__file__)
instance = cluster.add_instance("dummy", config_dir="configs", main_configs=["configs/min_chunk_size.xml"])
instance = cluster.add_instance("dummy")
cluster.start()
cluster.communication_port = 10000
@ -64,9 +64,9 @@ def started_cluster():
cluster.shutdown()
def run_query(instance, query, stdin=None):
def run_query(instance, query, stdin=None, settings=None):
logging.info("Running query '{}'...".format(query))
result = instance.query(query, stdin=stdin)
result = instance.query(query, stdin=stdin, settings=settings)
logging.info("Query finished")
return result
@ -151,7 +151,7 @@ def test_multipart_put(started_cluster):
long_data = [[i, i+1, i+2] for i in range(100000)]
long_values = "".join([ "{},{},{}\n".format(x,y,z) for x, y, z in long_data ])
put_query = "insert into table function s3('http://{}:{}/{}/test.csv', 'CSV', '{}') format CSV".format(started_cluster.mock_host, started_cluster.multipart_preserving_data_port, started_cluster.bucket, format)
run_query(instance, put_query, stdin=long_values)
run_query(instance, put_query, stdin=long_values, settings={'s3_min_upload_part_size': 1000000})
data = get_communication_data(started_cluster)
assert "multipart_received_data" in data
received_data = data["multipart_received_data"]

View File

@ -33,6 +33,7 @@ logging.getLogger().addHandler(logging.StreamHandler())
communication_port = int(sys.argv[1])
bucket = sys.argv[2]
def GetFreeTCPPortsAndIP(n):
result = []
sockets = []
@ -53,6 +54,7 @@ def GetFreeTCPPortsAndIP(n):
redirecting_preserving_data_port
), localhost = GetFreeTCPPortsAndIP(5)
data = {
"redirecting_to_http_port": redirecting_to_http_port,
"preserving_data_port": preserving_data_port,