Browse Source

sat retry dl refactor, cf #4

master
Olivier Courtin 2 months ago
parent
commit
95bd6c9ef1
1 changed files with 42 additions and 37 deletions
  1. +42
    -37
      neat_eo/tools/sat.py

+ 42
- 37
neat_eo/tools/sat.py View File

@@ -1,6 +1,7 @@
import os
import re
import sys
import time
import json
import glob

@@ -121,13 +122,39 @@ def search_scenes(args, log):
return scenes


def scene_already_dl(scene, out):
def is_scene_already_dl(scene, out):
scene_dir = os.path.join(out, scene["dir"][:42]) # 42 related to Theia MD issue, dirty workaround
scene_dir = "./" + scene_dir if not os.path.isabs(scene_dir) else scene_dir
dl = True if glob.glob(scene_dir + "*") else False
return dl


def dl_scene(scene, out, login, password, timeout):

try:
fp = open(out, "wb")
assert fp, "Unable to write in {}".format(out)

position = int(threading.currentThread().getName().replace("ThreadPoolExecutor-1_", "")) + 1
url = THEIA_URL + "/resto2/collections/SENTINEL2/{}/download/?issuerId=theia".format(scene["uuid"])
headers = {"Authorization": "Bearer {}".format(get_token(login, password))}
resp = requests.get(url, headers=headers, stream=True, timeout=timeout)
length = int(resp.headers["Content-Length"])
progress = tqdm(unit="B", position=position, unit_scale=True, ascii=True, desc=scene["uuid"], total=length)
for chunk in resp.iter_content(chunk_size=16384):
fp.write(chunk)
progress.update(16384)

fp.close()

except:
if os.path.isfile(out):
os.remove(out)
return False

return True


def main(args):
assert args.cover or args.granules or args.scenes, "Either --cover OR --granules OR --scenes is mandatory"
assert not (args.download and not args.out), "--download implies out parameter"
@@ -151,56 +178,34 @@ def main(args):

if args.download:

report = []
assert "auth" in config.keys() and "theia" in config["auth"].keys(), "No Theia auth defined in config file"
login, password = dict([auth.split("=") for auth in config["auth"]["theia"].split(" ")]).values()

log.log("--------------------------------------------------------------------------")
dl = tqdm(unit="scene", desc="Download", ascii=True, total=len(scenes), position=0)
for scene in scenes:
if scene_already_dl(scene, args.out):
if is_scene_already_dl(scene, args.out):
dl.update()

with futures.ThreadPoolExecutor(args.workers) as executor:

def worker(scene):

if scene_already_dl(scene, args.out):
if is_scene_already_dl(scene, args.out):
return scene, None, True

url = THEIA_URL + "/resto2/collections/SENTINEL2/{}/download/?issuerId=theia".format(scene["uuid"])
zip_path = os.path.join(args.out, scene["uuid"] + ".zip")

position = int(threading.currentThread().getName().replace("ThreadPoolExecutor-1_", "")) + 1
try:
fp = open(zip_path, "wb")
assert fp, "Unable to write in {}".format(zip_path)

headers = {"Authorization": "Bearer {}".format(get_token(login, password))}
resp = requests.get(url, headers=headers, stream=True, timeout=args.timeout)
length = int(resp.headers["Content-Length"])
progress = tqdm(
unit="B", position=position, unit_scale=True, ascii=True, desc=scene["uuid"], total=length
)
for chunk in resp.iter_content(chunk_size=16384):
fp.write(chunk)
progress.update(16384)

except: # Auth token time validity handling
try:
fp = open(zip_path, "wb")
assert fp, "Unable to write in {}".format(zip_path)

headers = {"Authorization": "Bearer {}".format(get_token(login, password))}
resp = requests.get(url, headers=headers, stream=True, timeout=args.timeout)
progress.reset()
for chunk in resp.iter_content(chunk_size=16384):
fp.write(chunk)
progress.update(16384)
except:
return scene, None, False

return scene, zip_path, True
out = os.path.join(args.out, scene["uuid"] + ".zip")

if dl_scene(scene, out, login, password, args.timeout):
return scene, out, True
time.sleep(15) # retry #1
if dl_scene(scene, out, login, password, args.timeout):
return scene, out, True
time.sleep(15) # retry #2
if dl_scene(scene, out, login, password, args.timeout):
return scene, out, True

return scene, None, False

for scene, zip_path, ok in executor.map(worker, scenes):
if zip_path and md5(zip_path) == scene["checksum"]:


Loading…
Cancel
Save