2021-08-29 14:18:04 +00:00
# include <Common/Exception.h>
# include <Common/TerminalSize.h>
# include <IO/ReadHelpers.h>
# include <IO/ReadBufferFromFile.h>
# include <IO/WriteHelpers.h>
# include <IO/WriteBufferFromHTTP.h>
# include <IO/WriteBufferFromFile.h>
# include <IO/copyData.h>
2022-03-08 17:05:55 +00:00
# include <Disks/IO/createReadBufferFromFileBase.h>
2021-08-29 14:18:04 +00:00
# include <boost/program_options.hpp>
# include <re2/re2.h>
# include <filesystem>
namespace fs = std : : filesystem ;
2021-09-08 17:22:24 +00:00
# define EXTRACT_PATH_PATTERN ".*\\ / store / (.*)"
2021-08-29 14:18:04 +00:00
namespace DB
{
namespace ErrorCodes
{
extern const int BAD_ARGUMENTS ;
}
/*
* A tool to collect table data files on local fs as is ( into current directory or into path from - - output - dir option ) .
* If test - mode option is added , files will be put by given url via PUT request .
*/
2022-02-09 00:50:50 +00:00
void processFile ( const fs : : path & file_path , const fs : : path & dst_path , bool test_mode , bool link , WriteBuffer & metadata_buf )
2021-08-29 14:18:04 +00:00
{
2021-09-08 17:22:24 +00:00
String remote_path ;
RE2 : : FullMatch ( file_path . string ( ) , EXTRACT_PATH_PATTERN , & remote_path ) ;
bool is_directory = fs : : is_directory ( file_path ) ;
2021-09-13 06:53:38 +00:00
writeText ( file_path . filename ( ) . string ( ) , metadata_buf ) ;
2021-09-08 17:22:24 +00:00
writeChar ( ' \t ' , metadata_buf ) ;
writeBoolText ( is_directory , metadata_buf ) ;
if ( ! is_directory )
2021-08-29 14:18:04 +00:00
{
writeChar ( ' \t ' , metadata_buf ) ;
writeIntText ( fs : : file_size ( file_path ) , metadata_buf ) ;
2021-09-08 17:22:24 +00:00
}
writeChar ( ' \n ' , metadata_buf ) ;
if ( is_directory )
return ;
auto dst_file_path = fs : : path ( dst_path ) / remote_path ;
2022-02-09 00:50:50 +00:00
if ( link )
{
fs : : create_symlink ( file_path , dst_file_path ) ;
}
2021-09-08 17:22:24 +00:00
else
2022-02-09 00:50:50 +00:00
{
2022-09-26 10:49:20 +00:00
ReadSettings read_settings { } ;
read_settings . local_fs_method = LocalFSReadMethod : : pread ;
auto src_buf = createReadBufferFromFileBase ( file_path , read_settings , fs : : file_size ( file_path ) ) ;
2022-02-09 00:50:50 +00:00
std : : shared_ptr < WriteBuffer > dst_buf ;
/// test mode for integration tests.
if ( test_mode )
dst_buf = std : : make_shared < WriteBufferFromHTTP > ( Poco : : URI ( dst_file_path ) , Poco : : Net : : HTTPRequest : : HTTP_PUT ) ;
else
dst_buf = std : : make_shared < WriteBufferFromFile > ( dst_file_path ) ;
2021-09-08 17:22:24 +00:00
2022-02-09 00:50:50 +00:00
copyData ( * src_buf , * dst_buf ) ;
dst_buf - > next ( ) ;
dst_buf - > finalize ( ) ;
}
2022-05-16 18:59:27 +00:00
}
2021-08-29 14:18:04 +00:00
2022-02-09 00:50:50 +00:00
void processTableFiles ( const fs : : path & data_path , fs : : path dst_path , bool test_mode , bool link )
2021-09-08 17:22:24 +00:00
{
std : : cerr < < " Data path: " < < data_path < < " , destination path: " < < dst_path < < std : : endl ;
String prefix ;
RE2 : : FullMatch ( data_path . string ( ) , EXTRACT_PATH_PATTERN , & prefix ) ;
std : : shared_ptr < WriteBuffer > root_meta ;
if ( test_mode )
{
dst_path / = " store " ;
auto files_root = dst_path / prefix ;
root_meta = std : : make_shared < WriteBufferFromHTTP > ( Poco : : URI ( files_root / " .index " ) , Poco : : Net : : HTTPRequest : : HTTP_PUT ) ;
}
else
{
dst_path = fs : : canonical ( dst_path ) ;
auto files_root = dst_path / prefix ;
fs : : create_directories ( files_root ) ;
root_meta = std : : make_shared < WriteBufferFromFile > ( files_root / " .index " ) ;
}
2021-08-29 14:18:04 +00:00
2021-09-08 17:22:24 +00:00
fs : : directory_iterator dir_end ;
for ( fs : : directory_iterator dir_it ( data_path ) ; dir_it ! = dir_end ; + + dir_it )
2021-08-29 14:18:04 +00:00
{
if ( dir_it - > is_directory ( ) )
{
2022-02-09 00:50:50 +00:00
processFile ( dir_it - > path ( ) , dst_path , test_mode , link , * root_meta ) ;
2021-09-08 17:22:24 +00:00
String directory_prefix ;
RE2 : : FullMatch ( dir_it - > path ( ) . string ( ) , EXTRACT_PATH_PATTERN , & directory_prefix ) ;
std : : shared_ptr < WriteBuffer > directory_meta ;
if ( test_mode )
{
auto files_root = dst_path / prefix ;
directory_meta = std : : make_shared < WriteBufferFromHTTP > ( Poco : : URI ( dst_path / directory_prefix / " .index " ) , Poco : : Net : : HTTPRequest : : HTTP_PUT ) ;
}
else
{
dst_path = fs : : canonical ( dst_path ) ;
auto files_root = dst_path / prefix ;
fs : : create_directories ( dst_path / directory_prefix ) ;
directory_meta = std : : make_shared < WriteBufferFromFile > ( dst_path / directory_prefix / " .index " ) ;
}
2021-08-29 14:18:04 +00:00
fs : : directory_iterator files_end ;
for ( fs : : directory_iterator file_it ( dir_it - > path ( ) ) ; file_it ! = files_end ; + + file_it )
2022-02-09 00:50:50 +00:00
processFile ( file_it - > path ( ) , dst_path , test_mode , link , * directory_meta ) ;
2021-09-08 17:22:24 +00:00
directory_meta - > next ( ) ;
directory_meta - > finalize ( ) ;
2021-08-29 14:18:04 +00:00
}
else
{
2022-02-09 00:50:50 +00:00
processFile ( dir_it - > path ( ) , dst_path , test_mode , link , * root_meta ) ;
2021-08-29 14:18:04 +00:00
}
}
2021-09-08 17:22:24 +00:00
root_meta - > next ( ) ;
root_meta - > finalize ( ) ;
2021-08-29 14:18:04 +00:00
}
}
int mainEntryClickHouseStaticFilesDiskUploader ( int argc , char * * argv )
try
{
using namespace DB ;
namespace po = boost : : program_options ;
po : : options_description description ( " Allowed options " , getTerminalWidth ( ) ) ;
description . add_options ( )
( " help,h " , " produce help message " )
2022-09-21 03:14:31 +00:00
( " metadata-path " , po : : value < std : : string > ( ) , " Metadata path (SELECT data_paths FROM system.tables WHERE name = 'table_name' AND database = 'database_name') " )
2021-08-29 14:18:04 +00:00
( " test-mode " , " Use test mode, which will put data on given url via PUT " )
2022-02-09 00:50:50 +00:00
( " link " , " Create symlinks instead of copying " )
2021-08-29 14:18:04 +00:00
( " url " , po : : value < std : : string > ( ) , " Web server url for test mode " )
2021-09-03 21:43:15 +00:00
( " output-dir " , po : : value < std : : string > ( ) , " Directory to put files in non-test mode " ) ;
2021-08-29 14:18:04 +00:00
po : : parsed_options parsed = po : : command_line_parser ( argc , argv ) . options ( description ) . run ( ) ;
po : : variables_map options ;
po : : store ( parsed , options ) ;
po : : notify ( options ) ;
if ( options . empty ( ) | | options . count ( " help " ) )
{
std : : cout < < description < < std : : endl ;
2022-08-21 18:24:17 +00:00
exit ( 0 ) ; // NOLINT(concurrency-mt-unsafe)
2021-08-29 14:18:04 +00:00
}
2021-09-08 17:22:24 +00:00
String metadata_path ;
2021-08-29 14:18:04 +00:00
if ( options . count ( " metadata-path " ) )
metadata_path = options [ " metadata-path " ] . as < std : : string > ( ) ;
else
throw Exception ( ErrorCodes : : BAD_ARGUMENTS , " No metadata-path option passed " ) ;
fs : : path fs_path = fs : : weakly_canonical ( metadata_path ) ;
if ( ! fs : : exists ( fs_path ) )
{
std : : cerr < < fmt : : format ( " Data path ({}) does not exist " , fs_path . string ( ) ) ;
return 1 ;
}
String root_path ;
2021-09-03 21:43:15 +00:00
auto test_mode = options . contains ( " test-mode " ) ;
if ( test_mode )
2021-08-29 14:18:04 +00:00
{
if ( options . count ( " url " ) )
2021-09-08 17:22:24 +00:00
root_path = options [ " url " ] . as < std : : string > ( ) ;
2021-08-29 14:18:04 +00:00
else
throw Exception ( ErrorCodes : : BAD_ARGUMENTS , " No url option passed for test mode " ) ;
}
else
{
if ( options . count ( " output-dir " ) )
root_path = options [ " output-dir " ] . as < std : : string > ( ) ;
else
root_path = fs : : current_path ( ) ;
}
2022-02-09 00:50:50 +00:00
processTableFiles ( fs_path , root_path , test_mode , options . count ( " link " ) ) ;
2021-08-29 14:18:04 +00:00
return 0 ;
}
catch ( . . . )
{
2022-05-23 23:38:17 +00:00
std : : cerr < < DB : : getCurrentExceptionMessage ( false ) < < ' \n ' ;
2021-08-29 14:18:04 +00:00
return 1 ;
}