Untitled

 avatar
unknown
plain_text
a year ago
7.0 kB
5
Indexable
from sqlalchemy.orm import Session
from sqlalchemy import (
    and_,
    desc,
    cast,
    Date,
    func,
    case    
) 
from .models.engine_model import EngineModel
from .models.variant import Variant
from .models.line import MSILLine
from .models.plan import Plan
from .models.part import Part
from .models.shop_configuration import ShopConfiguration
from .models.plan_file_status import PlanFileStatus,PlanFileStatusEnum
from ..repositories.repository import Repository
import json

class PlanRepository(Repository):
    def __init__(self, session: Session):
        self.session = session
        self.model_type = Plan

    def get_plan_filters(self, shop_id):
        with self.session:
            query = (self.session.query(EngineModel.id,EngineModel.name, Variant.name, MSILLine.id,MSILLine.name,Variant.id)
            .select_from(self.model_type)
            .filter(self.model_type.shop_id == shop_id)
            .join(EngineModel, self.model_type.model_id == EngineModel.id)
            .join(Variant, self.model_type.variant_id == Variant.id,isouter=True)
            .join(MSILLine, self.model_type.line_id == MSILLine.id)
            )

        return query.distinct().all()
    
    def get_models_by_shop_id(self,shop_id):
        with self.session:
            return self.session.query(EngineModel.id,func.upper(EngineModel.name)).filter(EngineModel.shop_id==shop_id).all()

    def get_line_by_shop_id(self,shop_id):
        with self.session:
            return self.session.query(MSILLine.id,func.upper(MSILLine.name)).filter(MSILLine.shop_id == shop_id).all()

    def get_part_by_shop_id(self,shop_id):
        with self.session:
            return self.session.query(Part.id,
                                      func.upper(Part.name),
                                      Part.variant_id,
                                      Part.model_id,
                                      Part.line_id).filter(Part.shop_id == shop_id).all()

    def insert_plan(self,plan_items_need_to_inserted):
        self.bulk_insert(plan_items_need_to_inserted)

    def get_variant_by_shop_id(self,shop_id):
        with self.session:
            return self.session.query(Variant.id,
                                      func.upper(Variant.name)).filter(Variant.shop_id == shop_id).all()
                
    def get_existing_plan(self,part_id,variant_id,model,line,production_date_arr,shift):
        with self.session:
            filters = []
            filters.append(Plan.variant_id == variant_id)
            filters.append(Plan.model_id==model)
            filters.append(Plan.production_date.in_(production_date_arr))
            filters.append(Plan.line_id==line)
            filters.append(Plan.part_id == part_id)
            if(shift != None):
                filters.append(Plan.shift == str(shift))
            
            query = self.session.query(Plan.production_date,Plan.id).filter(*filters).all() 
            dates = {}
            for each in query:
                date,plan_id = each
                dates[date] = plan_id
            return dates
        
    def update_the_existing_planned_quantity(self,id,pqty):
        with self.session:
            self.session.query(Plan).filter(Plan.id == id).update({Plan.planned_quantity:pqty})
            self.session.commit()

    def get_plan(self,
                 page_no=1,
                 page_size=20,
                 model_list=[],
                 line_list=[],
                 variant_list=[],
                 status=None,
                 shift=None,
                 from_dt=None,
                 to_dt=None,
                 shop_id=7):
        filters = []

        if len(model_list) != 0:
            filters.append(Plan.model_id.in_(model_list))

        if len(line_list) != 0:
            filters.append(Plan.line_id.in_(line_list))

        if len(variant_list) != 0:
            filters.append(Plan.variant_id.in_(variant_list))
        
        if status != None:
            filters.append(Plan.status == status)
        
        if shift != None:
            filters.append(Plan.shift == shift)
        
        if from_dt != None and to_dt != None:
            filters.append(and_(Plan.production_date >= from_dt,Plan.production_date <= to_dt))
        
        filters.append(Plan.shop_id == shop_id)

        with self.session:
            return self.session.query(
                Plan.id,
                MSILLine.id,
                MSILLine.name,
                EngineModel.id,
                EngineModel.name,
                Variant.id,
                Variant.name,
                Plan.production_date,
                Plan.shift,
                Plan.planned_quantity,
                Plan.actual_quantity,
                Plan.status,
                Part.id,
                Part.name
            ).select_from(Plan)\
            .join(Part,Part.id == Plan.part_id)\
            .join(MSILLine, MSILLine.id == Plan.line_id)\
            .join(EngineModel, EngineModel.id == Plan.model_id)\
            .join(Variant, Variant.id == Plan.variant_id)\
            .filter(*filters)\
            .order_by(desc(Plan.production_date))\
            .offset((page_no - 1) * page_size)\
            .limit(page_size)\
            .all()
        

    def update_plan_file_status(self,plan_status_id,errors,warnings):
        with self.session:
            if len(errors) == 0:
                self.session.query(PlanFileStatus).filter(PlanFileStatus.id == plan_status_id)\
                                                .update({PlanFileStatus.errors : json.dumps(errors),
                                                        PlanFileStatus.warnings : json.dumps(warnings),
                                                        PlanFileStatus.status : PlanFileStatusEnum.SUCCESS.value})

            else:
                self.session.query(PlanFileStatus).filter(PlanFileStatus.id == plan_status_id)\
                                                .update({PlanFileStatus.errors : json.dumps(errors),
                                                        PlanFileStatus.warnings : json.dumps(warnings),
                                                        PlanFileStatus.status : PlanFileStatusEnum.FAILED.value})\


            self.session.commit()

    def get_shop_configuration(self,shop_id):
        with self.session:
            shop_config =self.session.query(ShopConfiguration)\
                .filter(ShopConfiguration.shop_id==shop_id)\
                .first()

            return shop_config.is_day,shop_config.is_shift

    def get_all_shops(self):
        with self.session:
            return self.session.query(
                ShopConfiguration.id,
                ShopConfiguration.shop_id,
                ShopConfiguration.is_day,
                ShopConfiguration.is_shift
            ).all()

Editor is loading...
Leave a Comment