Untitled
unknown
plain_text
3 years ago
13 kB
4
Indexable
import os import json from concurrent.futures.thread import ThreadPoolExecutor from time import time, sleep as timesleep from datetime import datetime as dt from dateutil.relativedelta import relativedelta from requests import get, Response import argparse import geopandas as gpd from pydantic import BaseModel, Field, PrivateAttr, validator from typing import Optional, List, Literal, Union, Type, Dict from uuid import uuid4, UUID from enum import Enum from pathlib import Path from termcolor import colored # constants BASE_URL = "http://185.226.13.104" # helper func def create_fld_if_not_exist(path: Path): if not path.exists(): path.mkdir(parents=True, exist_ok=True) # base classes class BaseModelSetter(BaseModel): class Config: allow_mutation = True class BaseModelPartialEQ(BaseModel): def __eq__(self, other: BaseModel): return all([sv == other.__dict__[sk] for sk, sv in self.__dict__.items() if sv is not None]) # schema class Attrib(BaseModelSetter, BaseModelPartialEQ): href: Optional[str] rel: Optional[str] type: Optional[str] class Tag(BaseModelSetter, BaseModelPartialEQ): tag: str text: Optional[str] attrib: Optional[Attrib] class Entry(BaseModelSetter, BaseModelPartialEQ): entry: List[Tag] = Field(default_factory=list) class Feed(BaseModelSetter): entries: List[Entry] = Field(default_factory=list) head: Optional[List[Tag]] = Field(default_factory=list) totalResults: int = Field(default=0) # schema class JsonCCSISchema(BaseModelSetter): feed: Optional[Feed] _level: Literal['entries', 'tags'] = PrivateAttr(default='entries') def __iter__(self): """iter trought certain level """ for entry in self.feed.entries: if self._level == 'entries': yield entry for tag in entry.entry: if self._level == 'tags': yield tag @validator('feed', pre=True) def unpack_feed(cls, value): if isinstance(value, list): return value[0] elif isinstance(value, Feed): return value def __next__(self): return self def __call__(self, level: Literal['feeds', 'entries', 'tags'] = 'tags'): self._level = level return self # paraser and download class STATUS(Enum): READY = 1 PENDING = 2 FAILED = 3 TOO_MUCH_REQUESTS = 4 # resource dataclass class Resource(BaseModelSetter): link: Optional[str] title: Optional[Union[str, UUID]] response: Optional[Response] = Field(default=None) status: Optional[STATUS] = Field(default=STATUS.PENDING) index: int @validator('title', pre=True) def set_title(cls, title): if not title: return uuid4() return title class Config: arbitrary_types_allowed = True class Parser: def __call__(self, feeds: JsonCCSISchema) -> List[Resource]: resources = [] for index, entry in enumerate(feeds): link, title = None, None for tag in entry.entry: if tag.tag == 'link' and tag.attrib.rel == 'enclosure': link = tag.attrib.href elif tag.tag == 'title': title = tag.text resources.append(Resource(link=link, title=title, index=index)) return resources # request and download class CCSIRequester(BaseModel): resource: str params: Optional[Union[dict, BaseModel]] schemas: Type[JsonCCSISchema] parser: Parser records: List[Resource] = Field(default_factory=list) @validator('params', pre=True) def set_params(cls, value): if isinstance(value, BaseModel): return value.dict(by_alias=True) elif isinstance(value, dict): return value def run(self) -> List[Resource]: response = self.send_request() feed = self.parse_response(response) self.parse_feed(feed) self.get_next(feed) return self.records def parse_feed(self, feed: JsonCCSISchema): self.records += self.parser(feed) def parse_response(self, response: Response)-> JsonCCSISchema: return self.schemas(feed=response.json()) def get_next(self, feed: JsonCCSISchema)-> None: for tag in feed.feed.head: if Tag(tag='link', attrib=Attrib(rel='next')) == tag: next = tag # if any(next:=tag for tag in feed.feed.head if Tag(tag='link', attrib=Attrib(rel='next')) == tag): if len(self.records) == feed.feed.totalResults: return None response = get(url=next.attrib.href) feed = self.parse_response(response) self.parse_feed(feed) self.get_next(feed) def send_request(self): response = get(url=f"{BASE_URL}/{self.resource}/json/search?", params=self.params) print(response.url) if response.status_code != 200: raise Exception(f"ccsi request {response.url} failed") return response class Config: arbitrary_types_allowed = True def request_resource(resource: Resource) -> Resource: print(f' {resource.index} requested from {resource.link}') resource.response = get(resource.link, allow_redirects=True, stream=True) return resource def resolve_status(resource: Resource) -> Resource: if resource.response.status_code == 200: print(colored(f' {resource.index} requested from {resource.link} : data ready', 'green')) resource.status = STATUS.READY elif resource.response.status_code == 201: print(colored(f' {resource.index} requested from {resource.link} : data pending', 'blue')) resource.status = STATUS.PENDING elif resource.response.status_code == 429: print(colored(f'{resource.index} requested from {resource.link} : data too much requests', 'red')) resource.status = STATUS.TOO_MUCH_REQUESTS else: resource.status = STATUS.FAILED print(colored(f' {resource.index} requested from {resource.link} : failed', 'red')) return resource def download(path: Path, resource: Resource): print(colored(f' {resource.index} requested from {resource.link} : data download start', 'green')) with open(path / resource.title, 'wb') as fd: for chunk in resource.response.iter_content(): fd.write(chunk) print(colored(f' {resource.index} requested from {resource.link} : data download end', 'green')) class Downloader(BaseModel): pool: List[Resource] path: Path sleep: int = Field(default=200) sleep_step: int = Field(default=5) timeout: int = Field(default=12*60) def run(self): with ThreadPoolExecutor(max_workers=20) as executor: executor.map(self.request_data, self.pool) def request_data(self, resource: Resource) ->None: time = 0 while resource.status in [STATUS.PENDING, STATUS.TOO_MUCH_REQUESTS] or time <= self.timeout: request_resource(resource) resolve_status(resource) if resource.status == STATUS.READY: download(self.path, resource) break elif resource.status == STATUS.TOO_MUCH_REQUESTS or resource.status == STATUS.PENDING: timesleep(self.sleep) print(f' {resource.index} requested from {resource.link} : sleep for {self.sleep} s') time += self.sleep elif resource.status == STATUS.FAILED: break class Config: arbitrary_types_allowed = True # user input class UserInput(BaseModel): Start: str = Field(alias='timeStart') End: str = Field(alias='timeEnd') ID: Optional[str] Output: Path Geometry: str = Field(alias='bbox') Resources: Dict[str, Dict[str, str]] @validator('Output', 'Geometry', pre=True) def set_output(cls, value): return Path(value) @validator('Geometry', pre=True) def set_bbox(cls, value) -> str: return ','.join([str(v) for v in list(gpd.read_file(value).total_bounds)]) class Config: allow_population_by_field_name = True # ccsi params setting # following classes define a CCSI params and also allows to be expand for converting and validation class PageingSetting(BaseModel): maxRecords: str = Field(default='50') startIndex: str = Field(default='0') class WekeoS2Input(PageingSetting): processingLevel: str bbox: str timeStart: str timeEnd: str class CDSERA5Input(PageingSetting): customcamsDataset: str = Field(alias='custom:camsDataset') customformat: str = Field(alias='custom:format') bbox: str timeStart: str timeEnd: str class Config: allow_population_by_field_name = True class ONDAS3Input(PageingSetting): productType: str bbox: str timeStart: str timeEnd: str class WekeoCLMSInput(PageingSetting): productType: str RESOURCES = {'wekeo_s2': WekeoS2Input, 'cds_era5': CDSERA5Input, 'onda_s3': ONDAS3Input, 'wekeo_clms': WekeoCLMSInput} if __name__ == "__main__": # I little bit change a CLI for testing but i thinh that is not problem to change the code as you need. cli = argparse.ArgumentParser(description="This script produces LST of App 1") cli.add_argument("-c", "--City", type=str, metavar="", required=True, help="City name (Berlin, Copenhagen, Heraklion, Sofia)") cli.add_argument("-s", "--Start", type=str, metavar="", required=True, help="Start Date e.g. 2019-01-01") cli.add_argument("-e", "--End", type=str, metavar="", required=True, help="End Date e.g. 2019-01-190") cli.add_argument("-i", "--ID", type=str, metavar="", required=True, help="Order/Run ID") cli.add_argument("-o", "--Output", type=str, metavar="", required=True, help="Path to the output directory") cli.add_argument("-g", "--Geometry", type=str, metavar="", required=True, help="Path to the geometry with AOI (geojsons)") cli.add_argument("-r", "--Resources", type=str, metavar="", required=True, help="Dictionary of resources") args = cli.parse_args() # test # args = cli.parse_args(['--Start', '2020-03-01', # '--End', '2020-03-31', # '--Output', '/home/schmid/Desktop/test', # '--Geometry', '/media/schmid/One Touch1/Documents/WORK/Projects/Cure/cities/Heraklion.geojson', # '--Resources', '{\"onda_s3\": {\"productType\": \"rbt\"},' # '\"wekeo_s2\": {\"processingLevel\": \"level2a\"},' # '\"cds_era5\": {\"customcamsDataset\": \"total_column_water_vapour,10m_v_component_of_wind\", \"customformat\": \"grib\"}}', # '--ID', '123456789']) start = time() # validation of user input args.Resources = json.loads(args.Resources) print(args.Resources) geom_base_directory = args.Geometry #geometries = {"Berlin": os.path.join(geom_base_directory, "Berlin.geojson"), # "Copenhagen": os.path.join(geom_base_directory, "Copenhagen.geojson"), # "Heraklion": os.path.join(geom_base_directory, "Heraklion.geojson"), # "Sofia": os.path.join(geom_base_directory, "Sofia.geojson")} #args.Geometry = geometries[args.City] args.Geometry = os.path.join(geom_base_directory, f"{args.City}.geojson") user_input = UserInput(**vars(args)) for resource, extra_params in user_input.Resources.items(): print(resource) output_directory = user_input.Output / resource create_fld_if_not_exist(output_directory) # setting ccsi params and requesting the data extra_params.update(user_input.dict(by_alias=True)) # App1 and App10 need to have two month of S2 data although start date = end date start_date = extra_params["timeStart"] if resource == "wekeo_s2" and extra_params["timeStart"] == extra_params["timeEnd"]: new_start_date = dt.strptime(start_date, "%Y-%m-%d") + relativedelta(months=-2) extra_params["timeStart"] = str(new_start_date.date()) params = RESOURCES.get(resource)(**extra_params) requester = CCSIRequester(resource=resource, params=params, schemas=JsonCCSISchema, parser=Parser()) resource_data = requester.run() # download downloader = Downloader(pool=resource_data, path=output_directory, sleep=8*60, timeout=12*60) downloader.run() end = time() print(f'Process time: {end - start} s') first_wvp_date = start_date.replace("-", "") + "T000000" print(first_wvp_date)
Editor is loading...