Untitled
unknown
plain_text
a year ago
22 kB
6
Indexable
from django.http import QueryDict
from appCalendar.models import GenerateMeetingLink
from appCalendar.serializers import MeetingSerializer
from appGH.filters import NurseFilter
from appGH.serializers import DoctorSerializer, NurseSerializer
from rest_framework import viewsets, status, mixins
from rest_framework.exceptions import ValidationError
from rest_framework.response import Response
import datetime
import dateutil.parser
from django.db.models import Q, Prefetch, Count
from rest_framework.permissions import IsAuthenticated
from appUser.permissions import APIUserPermissions
from appSchedule.exception import ScheduleException
from .filters import DoctorScheduleFilters
from . import serializers
from .models import Schedule
from appGH.models import Doctor, Nurse, CompanyGroup, Careteam
from django_filters.rest_framework import DjangoFilterBackend
from .permissions import SchedulePermission
from django.db import transaction
from .utils import user_type, user_companies, validate_careteam_schedule
SCHEDULE_MAX_RECURRENCY = 8
def filter_queryset(self):
start_date = self.request.query_params.get('startDate')
stop_date = self.request.query_params.get('stopDate')
if bool(start_date) and bool(stop_date):
# filter by start and stop date
try:
start_date = dateutil.parser.parse(start_date, dayfirst=True)
stop_date = dateutil.parser.parse(
stop_date, dayfirst=True).replace(hour=23, minute=59, second=59)
except:
raise ValidationError({"message": "Data inválida"})
self.request.query_params._mutable = True
schedule_filters_dict = {}
for key in list(self.request.query_params.keys()):
if key.startswith('schedule'):
schedule_filters_dict[key[len('schedule') + 2:]] = self.request.query_params.pop(key)[0]
schedule_filters = QueryDict(mutable=True)
schedule_filters.update(schedule_filters_dict)
user_request = self.request.user
careteam_user = user_type(user_request)
if careteam_user is None:
raise ValidationError({
"message": "Usuário sem acesso aos horários médicos"
})
companies_user = user_companies(user_request)
if companies_user is None:
raise ValidationError({
"message": "Usuário sem acesso aos horários médicos"
})
queryset = self.filter_queryset(self.get_queryset().filter(
Q(schedule_id__start__gte=start_date, schedule_id__start__lt=stop_date) |
Q(schedule_id__stop__gt=start_date, schedule_id__stop__lte=stop_date)
).prefetch_related(
Prefetch('schedule_id', queryset=Schedule.objects.filter(
Q(careteam_id=None) | Q(careteam_id__in=careteam_user),
Q(company_group_id=None) | Q(company_group_id__in=companies_user),
**schedule_filters_dict).distinct())
))
return queryset
def list_view(self, request, *args, **kwargs):
start_date = self.request.query_params.get('startDate')
stop_date = self.request.query_params.get('stopDate')
schedule_filters_dict = {}
self.request.query_params._mutable = False
if bool(start_date) and bool(stop_date):
# filter by start and stop date
try:
start_date = dateutil.parser.parse(start_date, dayfirst=True)
stop_date = dateutil.parser.parse(
stop_date, dayfirst=True).replace(hour=23, minute=59, second=59)
except:
raise ValidationError({"message": "Data inválida"})
queryset = filter_queryset(self)
instances = []
for i, instance in enumerate(queryset):
instances.append(i)
instances[i] = {}
instances[i]['id'] = instance.id
instances[i]['name'] = instance.user_id.name
instances[i]['schedule_id'] = []
for schedule in instance.schedule_id.all():
if start_date <= schedule.start < stop_date or start_date < schedule.stop <= stop_date:
instances[i]['schedule_id'].append(serializers.ScheduleSerializer(schedule).data)
return Response(instances, status=status.HTTP_200_OK)
queryset = self.filter_queryset(self.get_queryset().prefetch_related(
Prefetch('schedule_id', queryset=Schedule.objects.filter(
**schedule_filters_dict))
))
serializer = self.get_serializer(queryset, many=True)
return Response(serializer.data)
def retrieve_view(self, request, *args, **kwargs):
instance = self.get_object()
start_date = self.request.query_params.get('startDate')
stop_date = self.request.query_params.get('stopDate')
careteam_id = self.request.query_params.get('careteam_id__id', None)
if bool(start_date) and bool(stop_date):
# filter by start and stop date
try:
start_date = dateutil.parser.parse(start_date, dayfirst=True)
stop_date = dateutil.parser.parse(
stop_date, dayfirst=True).replace(hour=23, minute=59, second=59)
except:
raise ValidationError({"message": "Data inválida"})
doctor = filter_queryset(self).filter(id=instance.id).first()
response = {'id': instance.id}
schedules = []
if doctor is not None:
instance = doctor
for schedule in instance.schedule_id.all():
if start_date <= schedule.start < stop_date or start_date < schedule.stop <= stop_date:
schedule_data = serializers.ScheduleSerializer(schedule).data
schedule_data['can_schedule'] = validate_careteam_schedule(schedule, careteam_id)
schedules.append(schedule_data)
response['schedule_id'] = schedules
return Response(response, status=status.HTTP_200_OK)
serializer = self.get_serializer(instance)
return Response(serializer.data)
def check_professional_schedule(professional_instance, start, stop, exclude_params={}):
instance_schedules = professional_instance.schedule_id.all()
# Verify if time start of last schedule is bigger than time stop
if start >= stop:
raise ScheduleException(
"O horário de início não pode ser posterior ou igual ao horário final.", status.HTTP_400_BAD_REQUEST)
# Verify if already exists schedule in this hour
filter_one = instance_schedules.filter(start__lt=stop, stop__gt=stop)
filter_two = instance_schedules.filter(
start__gte=start, stop__lte=stop)
filter_three = instance_schedules.filter(
start__lt=start, stop__gt=start)
if exclude_params:
filter_one = filter_one.exclude(**exclude_params)
filter_two = filter_two.exclude(**exclude_params)
filter_three = filter_three.exclude(**exclude_params)
if filter_one.exists() or filter_two.exists() or filter_three.exists():
raise ScheduleException(
f"Conflito entre os horários \'{start.strftime('%d/%m/%Y %X')}\' e \'{stop.strftime('%d/%m/%Y %X')}\'. "
"Já existe um horário para esse profissional neste horário.", status.HTTP_400_BAD_REQUEST)
return True
def prebuild_schedule_obj(data, professional_instance):
start = data["start"]
stop = data["stop"]
grade = data["grade"]
duration = stop - start
on_duty = data.get("on_duty", False)
schedule_data = {
"start": start,
"stop": stop,
"grade": grade,
"duration": duration,
"description": data.get("description", None),
"careteam_id": data.get("careteam", None),
"on_duty": on_duty,
"recurrent_schedule": data.get("recurrent_schedule", None),
"recurrent": data.get("recurrent", False),
"available": data.get("available", True) if not on_duty else True,
"content_object": professional_instance,
}
return Schedule(**schedule_data)
def create_view(model, self, request, *args, **kwargs):
schedule = request.data.get("schedule_id")
instance_id = request.data.get("professional_id")
schedule.pop("recurrent_schedule", None)
# Manually removing it so users can't change it manually
professional_instance = model.objects.get(id=instance_id)
start = dateutil.parser.parse(schedule["start"], dayfirst=True)
stop = dateutil.parser.parse(schedule["stop"], dayfirst=True)
careteam = schedule.get('careteam', None)
if careteam:
if not professional_instance.careteam_id.filter(id=careteam).exists():
raise ScheduleException(
'Professional ou careteam inválidos', status.HTTP_400_BAD_REQUEST)
schedule["start"] = start
schedule["stop"] = stop
first_schedule_obj = prebuild_schedule_obj(
schedule, professional_instance)
check_professional_schedule(
professional_instance, first_schedule_obj.start, first_schedule_obj.stop)
first_schedule_obj.save()
first_schedule_obj.company_group_id.set(schedule.get("company_group_id", []))
first_schedule_obj.save()
recurrent = schedule.get("recurrent", None)
if recurrent:
first_schedule_obj.recurrent_schedule = first_schedule_obj
first_schedule_obj.recurrent = True
schedule["recurrent_schedule"] = first_schedule_obj
schedule["recurrent"] = True
validate_schedules = []
for _ in range(SCHEDULE_MAX_RECURRENCY):
schedule["start"] += datetime.timedelta(days=7)
schedule["stop"] += datetime.timedelta(days=7)
schedule["duration"] = stop - start
schedule_obj = prebuild_schedule_obj(
schedule, professional_instance)
check_professional_schedule(
professional_instance, schedule_obj.start, schedule_obj.stop)
validate_schedules.append(schedule_obj)
schedule_to_update = professional_instance.schedule_id.bulk_create(
validate_schedules)
for schedule in schedule_to_update:
schedule.company_group_id.set(first_schedule_obj.company_group_id.all())
schedule.save()
else:
first_schedule_obj.recurrent_schedule = None
first_schedule_obj.recurrent = False
first_schedule_obj.save()
return Response(serializers.ScheduleSerializerWithProfessional(first_schedule_obj).data,
status=status.HTTP_201_CREATED)
def recurrent_update(self, request, *args, **kwargs):
partial = kwargs.pop('partial', False)
schedule = self.get_object()
start_first_schedule = schedule.start
stop_first_schedule = schedule.stop
start = dateutil.parser.parse(request.data.get(
"start"), dayfirst=True)
stop = dateutil.parser.parse(request.data.get(
"stop"), dayfirst=True)
# if already have a meeting don't let update datetimes
if schedule.meeting_id and (start != schedule.start or stop != schedule.stop):
raise ScheduleException(
"Não é possível editar a data .", status.HTTP_400_BAD_REQUEST)
check_professional_schedule(
schedule.content_object, start, stop, {'id': schedule.id})
recurrent_editing = request.data.get('recurrent_editing', False)
recurrent_schedules = schedule.recurrent_schedules.all()
request.data["start"] = start
request.data["stop"] = stop
request.data["duration"] = stop - start
on_duty = request.data.get('on_duty', False)
if on_duty:
request.data["available"] = True
serializer = self.get_serializer(
schedule, data=request.data, partial=partial)
serializer.is_valid(raise_exception=True)
self.perform_update(serializer)
if recurrent_editing and recurrent_schedules:
divergence_schedule = recurrent_schedules.exclude(
start__time=start_first_schedule.time(), stop__time=stop_first_schedule.time())[1:]
if divergence_schedule.exists():
raise ScheduleException(
"Não é possível editar esse horário pois há divergências em um dos horários recorrentes.",
status.HTTP_403_FORBIDDEN)
for recurrent_schedule in recurrent_schedules[1:]:
request.data["start"] += datetime.timedelta(days=7)
request.data["stop"] += datetime.timedelta(days=7)
request.data["duration"] = stop - start
recurrent_serializer = self.get_serializer(
recurrent_schedule, data=request.data, partial=partial)
recurrent_serializer.is_valid(raise_exception=True)
self.perform_update(recurrent_serializer)
# # Verify if time start of last schedule is bigger than time stop
# if start > stop:
# return Response({"message": "O horário de início não pode ser posterior ao horário final."},
# status=status.HTTP_400_BAD_REQUEST)
# # Verify if already exists schedule in this hour
# instance_schedules = schedule.content_object.schedule_id.all()
# filter_one = instance_schedules.filter(
# start__lt=stop, stop__gt=stop).filter(~Q(id=schedule.id))
# filter_two = instance_schedules.filter(
# start__gte=start, stop__lte=stop).filter(~Q(id=schedule.id))
# filter_three = instance_schedules.filter(
# start__lt=start, stop__gt=start).filter(~Q(id=schedule.id))
# if filter_one.exists() or filter_two.exists() or filter_three.exists():
# return Response({"message": "Conflito de horário, não é possível marcar em um horário já existente."},
# status=status.HTTP_400_BAD_REQUEST)
return Response(serializer.data, status=status.HTTP_200_OK)
class ScheduleViewSet(mixins.UpdateModelMixin,
mixins.DestroyModelMixin,
viewsets.GenericViewSet):
queryset = Schedule.objects.all()
serializer_class = serializers.ScheduleSerializer
permission_classes = [
IsAuthenticated,
SchedulePermission,
APIUserPermissions
]
def update(self, request, *args, **kwargs):
try:
with transaction.atomic():
return recurrent_update(self, request, args, kwargs)
except Exception as e:
if isinstance(e, ScheduleException):
return Response({"message": e.detail},
status=e.status_code)
return Response({"message": "Algo deu errado com o horário informado"},
status=status.HTTP_500_INTERNAL_SERVER_ERROR)
def destroy(self, request, *args, **kwargs):
schedule = self.get_object()
recurrent_schedules = schedule.recurrent_schedules
if schedule.meeting_id:
return Response({"message": "Não é possível deletar um horário associado a uma consulta."},
status=status.HTTP_403_FORBIDDEN)
if recurrent_schedules.exists():
divergence_schedule = recurrent_schedules.exclude(
start__time=schedule.start.time(), stop__time=schedule.stop.time())
recurrent_schedules_has_meeting = recurrent_schedules.exclude(
meeting_id=None)
if divergence_schedule.exists() or recurrent_schedules_has_meeting.exists():
return Response({
"message": "Não é possível deletar esse horário pois há divergências em um dos horários recorrentes gerados a partir deste."},
status=status.HTTP_403_FORBIDDEN)
self.perform_destroy(schedule)
return Response(status=status.HTTP_204_NO_CONTENT)
class ScheduleDoctorViewSet(mixins.RetrieveModelMixin,
mixins.CreateModelMixin,
mixins.ListModelMixin,
viewsets.GenericViewSet):
"""
ViewSet for the Schedule Doctor class
"""
queryset = Doctor.objects.all()
serializer_class = serializers.ProfessionalScheduleSerializer
filter_backends = [DjangoFilterBackend]
permission_classes = [
IsAuthenticated,
SchedulePermission,
APIUserPermissions
]
filterset_class = DoctorScheduleFilters
def get_serializer_class(self):
if self.action == 'metadata':
return DoctorSerializer
if self.action == 'retrieve':
return serializers.ScheduleSerializerWithProfessional
return super().get_serializer_class()
def get_queryset(self):
user_request = self.request.user
# admin
if user_request.active_role.role_name in ['admin', 'financial_administrator']:
queryset = self.queryset.all().distinct()
# company adm
elif user_request.active_role.role_name == 'company_administrator':
queryset = self.queryset.filter(
careteam_id__company_id__company_adm__user_id=user_request).distinct()
# patient
elif user_request.active_role.role_name == 'patient':
queryset = self.queryset.filter(
careteam_id__patient_careteam__user_id=user_request).distinct()
# secretary
elif user_request.active_role.role_name == 'secretary':
queryset = self.queryset.filter(
careteam_id__secretary_careteam__user_id=user_request).distinct()
# doctor
elif user_request.active_role.role_name == 'doctor':
queryset = self.queryset.filter(
careteam_id__doctor_careteam__user_id=user_request).distinct()
# nurse
elif user_request.active_role.role_name == 'nurse':
queryset = self.queryset.filter(
careteam_id__nurse_careteam__user_id=user_request).distinct()
# auditor
elif user_request.active_role.role_name == 'auditor':
queryset = self.queryset.filter(
careteam_id__auditor_careteam__user_id=user_request).distinct()
# API_USER
elif user_request.active_role.role_name == 'api_user':
queryset = self.queryset.filter(
careteam_id__company_id__access_company__user_id=user_request).distinct()
else:
queryset = self.queryset.none()
return queryset
def list(self, request, *args, **kwargs):
return list_view(self, request, args, kwargs)
def retrieve(self, request, *args, **kwargs):
return retrieve_view(self, request, args, kwargs)
def create(self, request, *args, **kwargs):
try:
with transaction.atomic():
return create_view(Doctor, self, request, args, kwargs)
except Exception as e:
if isinstance(e, ScheduleException):
return Response({"message": e.detail},
status=e.status_code)
return Response({"message": "Algo deu errado com o horário informado"},
status=status.HTTP_500_INTERNAL_SERVER_ERROR)
class ScheduleNurseViewSet(mixins.ListModelMixin,
mixins.CreateModelMixin,
mixins.RetrieveModelMixin,
viewsets.GenericViewSet):
"""
ViewSet for the Schedule Nurse class
"""
queryset = Nurse.objects.all()
serializer_class = serializers.ProfessionalScheduleSerializer
permission_classes = [
IsAuthenticated,
SchedulePermission,
APIUserPermissions
]
filterset_class = NurseFilter
def get_serializer_class(self):
if self.action == 'metadata':
return NurseSerializer
if self.action == 'retrieve':
return serializers.ScheduleSerializerWithProfessional
return super().get_serializer_class()
def get_queryset(self):
user_request = self.request.user
# admin
if user_request.active_role.role_name == 'admin':
queryset = self.queryset.all().distinct()
# company adm
elif user_request.active_role.role_name == 'company_administrator':
queryset = self.queryset.filter(
careteam_id__company_id__company_adm__user_id=user_request).distinct()
# patient
elif user_request.active_role.role_name == 'patient':
queryset = self.queryset.filter(
careteam_id__patient_careteam__user_id=user_request).distinct()
# secretary
elif user_request.active_role.role_name == 'secretary':
queryset = self.queryset.filter(
careteam_id__secretary_careteam__user_id=user_request).distinct()
# doctor
elif user_request.active_role.role_name == 'doctor':
queryset = self.queryset.filter(
careteam_id__doctor_careteam__user_id=user_request).distinct()
# nurse
elif user_request.active_role.role_name == 'nurse':
queryset = self.queryset.filter(
careteam_id__nurse_careteam__user_id=user_request).distinct()
# auditor
elif user_request.active_role.role_name == 'auditor':
queryset = self.queryset.filter(
careteam_id__auditor_careteam__user_id=user_request).distinct()
# API_USER
elif user_request.active_role.role_name == 'api_user':
queryset = self.queryset.filter(
careteam_id__company_id__access_company__user_id=user_request).distinct()
else:
queryset = self.queryset.none()
return queryset
def list(self, request, *args, **kwargs):
return list_view(self, request, args, kwargs)
def retrieve(self, request, *args, **kwargs):
return retrieve_view(self, request, args, kwargs)
def create(self, request, *args, **kwargs):
try:
with transaction.atomic():
return create_view(Nurse, self, request, args, kwargs)
except Exception as e:
if isinstance(e, ScheduleException):
return Response({"message": e.detail},
status=e.status_code)
return Response({"message": "Algo deu errado com o horário informado"},
status=status.HTTP_500_INTERNAL_SERVER_ERROR)
Editor is loading...
Leave a Comment