Untitled
unknown
plain_text
2 years ago
7.0 kB
6
Indexable
from sqlalchemy.orm import Session
from sqlalchemy import (
and_,
desc,
cast,
Date,
func,
case
)
from .models.engine_model import EngineModel
from .models.variant import Variant
from .models.line import MSILLine
from .models.plan import Plan
from .models.part import Part
from .models.shop_configuration import ShopConfiguration
from .models.plan_file_status import PlanFileStatus,PlanFileStatusEnum
from ..repositories.repository import Repository
import json
class PlanRepository(Repository):
def __init__(self, session: Session):
self.session = session
self.model_type = Plan
def get_plan_filters(self, shop_id):
with self.session:
query = (self.session.query(EngineModel.id,EngineModel.name, Variant.name, MSILLine.id,MSILLine.name,Variant.id)
.select_from(self.model_type)
.filter(self.model_type.shop_id == shop_id)
.join(EngineModel, self.model_type.model_id == EngineModel.id)
.join(Variant, self.model_type.variant_id == Variant.id,isouter=True)
.join(MSILLine, self.model_type.line_id == MSILLine.id)
)
return query.distinct().all()
def get_models_by_shop_id(self,shop_id):
with self.session:
return self.session.query(EngineModel.id,func.upper(EngineModel.name)).filter(EngineModel.shop_id==shop_id).all()
def get_line_by_shop_id(self,shop_id):
with self.session:
return self.session.query(MSILLine.id,func.upper(MSILLine.name)).filter(MSILLine.shop_id == shop_id).all()
def get_part_by_shop_id(self,shop_id):
with self.session:
return self.session.query(Part.id,
func.upper(Part.name),
Part.variant_id,
Part.model_id,
Part.line_id).filter(Part.shop_id == shop_id).all()
def insert_plan(self,plan_items_need_to_inserted):
self.bulk_insert(plan_items_need_to_inserted)
def get_variant_by_shop_id(self,shop_id):
with self.session:
return self.session.query(Variant.id,
func.upper(Variant.name)).filter(Variant.shop_id == shop_id).all()
def get_existing_plan(self,part_id,variant_id,model,line,production_date_arr,shift):
with self.session:
filters = []
filters.append(Plan.variant_id == variant_id)
filters.append(Plan.model_id==model)
filters.append(Plan.production_date.in_(production_date_arr))
filters.append(Plan.line_id==line)
filters.append(Plan.part_id == part_id)
if(shift != None):
filters.append(Plan.shift == str(shift))
query = self.session.query(Plan.production_date,Plan.id).filter(*filters).all()
dates = {}
for each in query:
date,plan_id = each
dates[date] = plan_id
return dates
def update_the_existing_planned_quantity(self,id,pqty):
with self.session:
self.session.query(Plan).filter(Plan.id == id).update({Plan.planned_quantity:pqty})
self.session.commit()
def get_plan(self,
page_no=1,
page_size=20,
model_list=[],
line_list=[],
variant_list=[],
status=None,
shift=None,
from_dt=None,
to_dt=None,
shop_id=7):
filters = []
if len(model_list) != 0:
filters.append(Plan.model_id.in_(model_list))
if len(line_list) != 0:
filters.append(Plan.line_id.in_(line_list))
if len(variant_list) != 0:
filters.append(Plan.variant_id.in_(variant_list))
if status != None:
filters.append(Plan.status == status)
if shift != None:
filters.append(Plan.shift == shift)
if from_dt != None and to_dt != None:
filters.append(and_(Plan.production_date >= from_dt,Plan.production_date <= to_dt))
filters.append(Plan.shop_id == shop_id)
with self.session:
return self.session.query(
Plan.id,
MSILLine.id,
MSILLine.name,
EngineModel.id,
EngineModel.name,
Variant.id,
Variant.name,
Plan.production_date,
Plan.shift,
Plan.planned_quantity,
Plan.actual_quantity,
Plan.status,
Part.id,
Part.name
).select_from(Plan)\
.join(Part,Part.id == Plan.part_id)\
.join(MSILLine, MSILLine.id == Plan.line_id)\
.join(EngineModel, EngineModel.id == Plan.model_id)\
.join(Variant, Variant.id == Plan.variant_id)\
.filter(*filters)\
.order_by(desc(Plan.production_date))\
.offset((page_no - 1) * page_size)\
.limit(page_size)\
.all()
def update_plan_file_status(self,plan_status_id,errors,warnings):
with self.session:
if len(errors) == 0:
self.session.query(PlanFileStatus).filter(PlanFileStatus.id == plan_status_id)\
.update({PlanFileStatus.errors : json.dumps(errors),
PlanFileStatus.warnings : json.dumps(warnings),
PlanFileStatus.status : PlanFileStatusEnum.SUCCESS.value})
else:
self.session.query(PlanFileStatus).filter(PlanFileStatus.id == plan_status_id)\
.update({PlanFileStatus.errors : json.dumps(errors),
PlanFileStatus.warnings : json.dumps(warnings),
PlanFileStatus.status : PlanFileStatusEnum.FAILED.value})\
self.session.commit()
def get_shop_configuration(self,shop_id):
with self.session:
shop_config =self.session.query(ShopConfiguration)\
.filter(ShopConfiguration.shop_id==shop_id)\
.first()
return shop_config.is_day,shop_config.is_shift
def get_all_shops(self):
with self.session:
return self.session.query(
ShopConfiguration.id,
ShopConfiguration.shop_id,
ShopConfiguration.is_day,
ShopConfiguration.is_shift
).all()
Editor is loading...
Leave a Comment