Untitled
unknown
plain_text
a year ago
7.0 kB
5
Indexable
from sqlalchemy.orm import Session from sqlalchemy import ( and_, desc, cast, Date, func, case ) from .models.engine_model import EngineModel from .models.variant import Variant from .models.line import MSILLine from .models.plan import Plan from .models.part import Part from .models.shop_configuration import ShopConfiguration from .models.plan_file_status import PlanFileStatus,PlanFileStatusEnum from ..repositories.repository import Repository import json class PlanRepository(Repository): def __init__(self, session: Session): self.session = session self.model_type = Plan def get_plan_filters(self, shop_id): with self.session: query = (self.session.query(EngineModel.id,EngineModel.name, Variant.name, MSILLine.id,MSILLine.name,Variant.id) .select_from(self.model_type) .filter(self.model_type.shop_id == shop_id) .join(EngineModel, self.model_type.model_id == EngineModel.id) .join(Variant, self.model_type.variant_id == Variant.id,isouter=True) .join(MSILLine, self.model_type.line_id == MSILLine.id) ) return query.distinct().all() def get_models_by_shop_id(self,shop_id): with self.session: return self.session.query(EngineModel.id,func.upper(EngineModel.name)).filter(EngineModel.shop_id==shop_id).all() def get_line_by_shop_id(self,shop_id): with self.session: return self.session.query(MSILLine.id,func.upper(MSILLine.name)).filter(MSILLine.shop_id == shop_id).all() def get_part_by_shop_id(self,shop_id): with self.session: return self.session.query(Part.id, func.upper(Part.name), Part.variant_id, Part.model_id, Part.line_id).filter(Part.shop_id == shop_id).all() def insert_plan(self,plan_items_need_to_inserted): self.bulk_insert(plan_items_need_to_inserted) def get_variant_by_shop_id(self,shop_id): with self.session: return self.session.query(Variant.id, func.upper(Variant.name)).filter(Variant.shop_id == shop_id).all() def get_existing_plan(self,part_id,variant_id,model,line,production_date_arr,shift): with self.session: filters = [] filters.append(Plan.variant_id == variant_id) filters.append(Plan.model_id==model) filters.append(Plan.production_date.in_(production_date_arr)) filters.append(Plan.line_id==line) filters.append(Plan.part_id == part_id) if(shift != None): filters.append(Plan.shift == str(shift)) query = self.session.query(Plan.production_date,Plan.id).filter(*filters).all() dates = {} for each in query: date,plan_id = each dates[date] = plan_id return dates def update_the_existing_planned_quantity(self,id,pqty): with self.session: self.session.query(Plan).filter(Plan.id == id).update({Plan.planned_quantity:pqty}) self.session.commit() def get_plan(self, page_no=1, page_size=20, model_list=[], line_list=[], variant_list=[], status=None, shift=None, from_dt=None, to_dt=None, shop_id=7): filters = [] if len(model_list) != 0: filters.append(Plan.model_id.in_(model_list)) if len(line_list) != 0: filters.append(Plan.line_id.in_(line_list)) if len(variant_list) != 0: filters.append(Plan.variant_id.in_(variant_list)) if status != None: filters.append(Plan.status == status) if shift != None: filters.append(Plan.shift == shift) if from_dt != None and to_dt != None: filters.append(and_(Plan.production_date >= from_dt,Plan.production_date <= to_dt)) filters.append(Plan.shop_id == shop_id) with self.session: return self.session.query( Plan.id, MSILLine.id, MSILLine.name, EngineModel.id, EngineModel.name, Variant.id, Variant.name, Plan.production_date, Plan.shift, Plan.planned_quantity, Plan.actual_quantity, Plan.status, Part.id, Part.name ).select_from(Plan)\ .join(Part,Part.id == Plan.part_id)\ .join(MSILLine, MSILLine.id == Plan.line_id)\ .join(EngineModel, EngineModel.id == Plan.model_id)\ .join(Variant, Variant.id == Plan.variant_id)\ .filter(*filters)\ .order_by(desc(Plan.production_date))\ .offset((page_no - 1) * page_size)\ .limit(page_size)\ .all() def update_plan_file_status(self,plan_status_id,errors,warnings): with self.session: if len(errors) == 0: self.session.query(PlanFileStatus).filter(PlanFileStatus.id == plan_status_id)\ .update({PlanFileStatus.errors : json.dumps(errors), PlanFileStatus.warnings : json.dumps(warnings), PlanFileStatus.status : PlanFileStatusEnum.SUCCESS.value}) else: self.session.query(PlanFileStatus).filter(PlanFileStatus.id == plan_status_id)\ .update({PlanFileStatus.errors : json.dumps(errors), PlanFileStatus.warnings : json.dumps(warnings), PlanFileStatus.status : PlanFileStatusEnum.FAILED.value})\ self.session.commit() def get_shop_configuration(self,shop_id): with self.session: shop_config =self.session.query(ShopConfiguration)\ .filter(ShopConfiguration.shop_id==shop_id)\ .first() return shop_config.is_day,shop_config.is_shift def get_all_shops(self): with self.session: return self.session.query( ShopConfiguration.id, ShopConfiguration.shop_id, ShopConfiguration.is_day, ShopConfiguration.is_shift ).all()
Editor is loading...
Leave a Comment