Untitled

mail@pastecode.io avatar
unknown
plain_text
2 years ago
3.4 kB
6
Indexable
    def get_flood_zones(
        self,
        sov_rows: List[BasePropertySovInputDTO],
    ) -> List[ActuarialRaterPropertySovInputDTO]:
        """
        Assigns a valid model zone to each PropertySovInput row.
        """
        #  defining logic to sanitise county name (needed for Ignis county list)
        def get_sanitised_county_name(county_name: str):
            """
            Sanitising county name in order to find matches in the Cat Rates Table.
            This operation should be removed once the Actuarial Cat Rates Table conforms to AIR.
            """
            unwanted_suffixes = [
                " CITY AND BOROUGH",
                " COUNTY",
                " DISTRICT",
                " PARISH",
                " BOROUGH",
                " CENSUS AREA",
            ]
            for suffix in unwanted_suffixes:
                if suffix in county_name:
                    return county_name.replace(suffix, "")
            return county_name

        # getting all available Ignis zones
        model_zones = self.cat_rates_map.keys()

        # assigning a zone to each property row:
        flood_sov_rows = []
        for sov_row in sov_rows:
            flood_sov_row = ActuarialRaterPropertySovInputDTO(**(sov_row.dict()))

            if sov_row.country in EXPOSED_COUNTRIES:
                county = (
                    get_sanitised_county_name(sov_row.county)
                    if sov_row.county
                    else None
                )
                state = sov_row.state

                if (county is not None) and (state + " " + county in model_zones):  # type: ignore
                    cat_zone = state + " " + county  # type: ignore

                # if a valid zone can't be found and property in US, default to State Average
                else:
                    cat_zone = state + " Average"  # type: ignore
                flood_sov_row.cat_zone = cat_zone
                flood_sov_rows.append(flood_sov_row)

        return flood_sov_rows

    def get_flood_rows(
        self,
        sov_rows: List[ActuarialRaterPropertySovInputDTO],
    ) -> List[FloodRowDTO]:
        """
        Groups PropertySovInput objects into FloodRow objects.
        Sums up TIV. Ensures TIV is a numerical value.
        """
        zones = sorted(set(row.cat_zone for row in sov_rows))  # type: ignore
        flood_rows = {str(zone): FloodRowDTO(flood_zone=zone) for zone in zones}

        for sov_row in sov_rows:
            flood_row: FloodRowDTO = flood_rows[str(sov_row.cat_zone)]
            tiv_total = sov_row.tiv_total if sov_row.tiv_total else 0
            flood_row.tiv_total += tiv_total  # type: ignore
            flood_row.country = sov_row.country

        return list(flood_rows.values())

    def get_exposed_flood_rows(
        self,
        flood_rows: List[FloodRowDTO],
    ) -> List[FloodRowDTO]:
        """
        Removes Flood Rows that are not exposed to the Flood Peril.
        Sets US Cat Price in the process (part of assessing if exposed).
        """
        exposed_rows = []
        for zone in flood_rows:
            cat_rate = self.get_cat_rate(zone.flood_zone)  # type: ignore
            zone.us_cat_price = self.get_cat_price(tiv_total=zone.tiv_total, cat_rate=cat_rate)  # type: ignore
            if zone.us_cat_price > 0:
                exposed_rows.append(zone)

        return exposed_rows