Untitled

 avatar
unknown
python
a year ago
5.5 kB
3
Indexable
import re
import yaml, json
from mrjob.job import MRJob
from typing import List, Tuple
from operator import itemgetter


class Source_Mapper:

    columns = None
    with open("schema_mapper.yaml", "r") as stream:
        config = yaml.safe_load(stream)

    def reducer(self, model: str, model_attributes: List) -> Tuple[str, List]:
        yield (model, list(model_attributes))


class Source1_Mapper(Source_Mapper, MRJob):

    def __init__(self, data):
        Source_Mapper.__init__(self)
        MRJob.__init__(self, data)

    def mapper(self, _: None, line: str) -> Tuple[str, List]:
        """Converts strings from csv to single schema format
        Args:
            line(str): string with a feature
            representation of the entity
        Returns:
            key(str): laptop model name
            value(List): list of converted attributes
            whose values ​​satisfy the target schema
        """
        config = Source_Mapper.config["source1"]
        ids = config["ids"]
        column_values = line.split(",")
        if column_values[config["model_name_id"]] == "Модель":
            Source1_Mapper.columns = itemgetter(*([config["model_name_id"]] + ids))(column_values)
        else:
            process_name = column_values[config["process_name_id"]].split(" (")[0][1:]
            cores_number = int(column_values[config["cores_id"]])
            ssd_memory_capacity, type_ = column_values[config["ssd_id"]].split()[1:]
            ssd_memory_capacity = int(ssd_memory_capacity)
            if type_ == "ТБ":
                ssd_memory_capacity *= 1024
            ram_capacity = int(column_values[config["ram_id"]].split()[0])
            screen = column_values[config["screen_id"]].split()[0][1:]
            equipment = column_values[config["equipment_id"]].split(" / ")
            year = int(column_values[config["year_id"]])
            color = column_values[config["color_id"]]
            weight = float(column_values[config["weight_id"]])
            price = int(column_values[config["price_id"]].replace(" ", "").split("р")[0])
            yield (
                column_values[config["model_name_id"]],
                [
                    year,
                    ssd_memory_capacity,
                    ram_capacity,
                    screen,
                    process_name,
                    cores_number,
                    color,
                    equipment,
                    weight,
                    price,
                ],
            )


class Source2_Mapper(Source_Mapper, MRJob):

    def __init__(self, data):
        Source_Mapper.__init__(self)
        MRJob.__init__(self, data)

    def mapper(self, _: None, line: str) -> Tuple[str, List]:
        """csv -> [key, value]
        Converts strings from csv to single schema format
        Args:
            line(str): string with a feature
            representation of the entity
        Returns:
            key(str): laptop model name
            value(List): list of converted attributes
            whose values ​​satisfy the target schema
        """
        config = Source_Mapper.config["source2"]
        ids = config["ids"]
        column_values = line.split(",")
        if column_values[config["model_name_id"]] == "Модель":
            Source2_Mapper.columns = itemgetter(*([config["model_name_id"]] + ids))(column_values)
        else:
            process_name = column_values[config["process_name_id"]]
            cores_number = int(column_values[config["cores_id"]])
            ssd_memory_capacity = int(column_values[config["ssd_id"]].split()[0])
            ram_capacity = int(column_values[config["ram_id"]].split()[0])
            equipment = column_values[config["equipment_id"]].split("; ")
            year = int(column_values[config["year_id"]])
            weight = float(column_values[config["weight_id"]].split()[0])
            price = int(column_values[config["price_id"]].split("р")[0].replace(" ", ""))
            yield (
                column_values[config["model_name_id"]],
                [
                    year,
                    ssd_memory_capacity,
                    ram_capacity,
                    column_values[config["screen_id"]].replace("×", "x"),
                    process_name,
                    cores_number,
                    column_values[config["color_id"]],
                    equipment,
                    weight,
                    price,
                ],
            )


class Schema_Mapper():

    def __init__(self, source_name):
        self.source_name = source_name
        with open("schema_mapper.yaml", "r") as stream:
            self.config = yaml.safe_load(stream)

    def map_source(self):

        data = []
        source_mappers_dict = {"source1": Source1_Mapper, "source2": Source2_Mapper}
        Source_Mapper = source_mappers_dict[self.source_name]([f"{self.source_name}.csv"])
        with Source_Mapper.make_runner() as runner:
            runner.run()
            Source_Mapper.columns = self.config["schema_columns"]
            for model, atributes in Source_Mapper.parse_output(runner.cat_output()):
                new_dict = dict(zip(Source_Mapper.columns, [model] + next(iter(atributes))))
                data.append(new_dict)
        with open(f"{self.source_name}.json", "w", encoding="utf8") as json_file:
            json.dump(data, json_file, ensure_ascii=False, indent=4)

Schema_Mapper("source1").map_source(), Schema_Mapper("source2").map_source()
Editor is loading...
Leave a Comment