Merge pull request #14578 from ClickHouse/retries_in_s3_downloader

Retries in s3 downloader
This commit is contained in:
alesapin 2020-09-08 12:53:09 +03:00 committed by GitHub
commit 4aad57de87
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 68 additions and 38 deletions

View File

@ -2,6 +2,7 @@
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
import os import os
import sys import sys
import time
import tarfile import tarfile
import logging import logging
import argparse import argparse
@ -16,6 +17,8 @@ AVAILABLE_DATASETS = {
'visits': 'visits_v1.tar', 'visits': 'visits_v1.tar',
} }
RETRIES_COUNT = 5
def _get_temp_file_name(): def _get_temp_file_name():
return os.path.join(tempfile._get_default_tempdir(), next(tempfile._get_candidate_names())) return os.path.join(tempfile._get_default_tempdir(), next(tempfile._get_candidate_names()))
@ -24,25 +27,37 @@ def build_url(base_url, dataset):
def dowload_with_progress(url, path): def dowload_with_progress(url, path):
logging.info("Downloading from %s to temp path %s", url, path) logging.info("Downloading from %s to temp path %s", url, path)
with open(path, 'w') as f: for i in range(RETRIES_COUNT):
response = requests.get(url, stream=True) try:
response.raise_for_status() with open(path, 'w') as f:
total_length = response.headers.get('content-length') response = requests.get(url, stream=True)
if total_length is None or int(total_length) == 0: response.raise_for_status()
logging.info("No content-length, will download file without progress") total_length = response.headers.get('content-length')
f.write(response.content) if total_length is None or int(total_length) == 0:
else: logging.info("No content-length, will download file without progress")
dl = 0 f.write(response.content)
total_length = int(total_length) else:
logging.info("Content length is %ld bytes", total_length) dl = 0
for data in response.iter_content(chunk_size=4096): total_length = int(total_length)
dl += len(data) logging.info("Content length is %ld bytes", total_length)
f.write(data) for data in response.iter_content(chunk_size=4096):
if sys.stdout.isatty(): dl += len(data)
done = int(50 * dl / total_length) f.write(data)
percent = int(100 * float(dl) / total_length) if sys.stdout.isatty():
sys.stdout.write("\r[{}{}] {}%".format('=' * done, ' ' * (50-done), percent)) done = int(50 * dl / total_length)
sys.stdout.flush() percent = int(100 * float(dl) / total_length)
sys.stdout.write("\r[{}{}] {}%".format('=' * done, ' ' * (50-done), percent))
sys.stdout.flush()
break
except Exception as ex:
sys.stdout.write("\n")
time.sleep(3)
logging.info("Exception while downloading %s, retry %s", ex, i + 1)
if os.path.exists(path):
os.remove(path)
else:
raise Exception("Cannot download dataset from {}, all retries exceeded".format(url))
sys.stdout.write("\n") sys.stdout.write("\n")
logging.info("Downloading finished") logging.info("Downloading finished")

View File

@ -2,6 +2,7 @@
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
import os import os
import sys import sys
import time
import tarfile import tarfile
import logging import logging
import argparse import argparse
@ -16,6 +17,8 @@ AVAILABLE_DATASETS = {
'visits': 'visits_v1.tar', 'visits': 'visits_v1.tar',
} }
RETRIES_COUNT = 5
def _get_temp_file_name(): def _get_temp_file_name():
return os.path.join(tempfile._get_default_tempdir(), next(tempfile._get_candidate_names())) return os.path.join(tempfile._get_default_tempdir(), next(tempfile._get_candidate_names()))
@ -24,25 +27,37 @@ def build_url(base_url, dataset):
def dowload_with_progress(url, path): def dowload_with_progress(url, path):
logging.info("Downloading from %s to temp path %s", url, path) logging.info("Downloading from %s to temp path %s", url, path)
with open(path, 'w') as f: for i in range(RETRIES_COUNT):
response = requests.get(url, stream=True) try:
response.raise_for_status() with open(path, 'w') as f:
total_length = response.headers.get('content-length') response = requests.get(url, stream=True)
if total_length is None or int(total_length) == 0: response.raise_for_status()
logging.info("No content-length, will download file without progress") total_length = response.headers.get('content-length')
f.write(response.content) if total_length is None or int(total_length) == 0:
else: logging.info("No content-length, will download file without progress")
dl = 0 f.write(response.content)
total_length = int(total_length) else:
logging.info("Content length is %ld bytes", total_length) dl = 0
for data in response.iter_content(chunk_size=4096): total_length = int(total_length)
dl += len(data) logging.info("Content length is %ld bytes", total_length)
f.write(data) for data in response.iter_content(chunk_size=4096):
if sys.stdout.isatty(): dl += len(data)
done = int(50 * dl / total_length) f.write(data)
percent = int(100 * float(dl) / total_length) if sys.stdout.isatty():
sys.stdout.write("\r[{}{}] {}%".format('=' * done, ' ' * (50-done), percent)) done = int(50 * dl / total_length)
sys.stdout.flush() percent = int(100 * float(dl) / total_length)
sys.stdout.write("\r[{}{}] {}%".format('=' * done, ' ' * (50-done), percent))
sys.stdout.flush()
break
except Exception as ex:
sys.stdout.write("\n")
time.sleep(3)
logging.info("Exception while downloading %s, retry %s", ex, i + 1)
if os.path.exists(path):
os.remove(path)
else:
raise Exception("Cannot download dataset from {}, all retries exceeded".format(url))
sys.stdout.write("\n") sys.stdout.write("\n")
logging.info("Downloading finished") logging.info("Downloading finished")