Untitled

mail@pastecode.io avatar
unknown
plain_text
8 months ago
15 kB
0
Indexable
Never
##############################################################################
#
#    Copyright (C) 2016 Exo Software, Lda. (<https://exosoftware.pt>)
#
##############################################################################
import base64
import json
import logging
from datetime import date, timedelta, datetime

from dateutil.relativedelta import relativedelta

from odoo import api, models, fields

_logger = logging.getLogger(__name__)  # pylint: disable=C0103


class ResCompany(models.Model):
    _inherit = "res.company"

    @api.model
    def l10n_pt_get_saft(
        self,
        company_ids=[],
        saft_type="F",
        period="month_last",
        date_from=None,
        date_to=None,
        self_billing_partner_ids=[],
        send_email=False,
        email_recipients=[],
        invoices=False
    ):
        """ A method to obtain a SAF-T PT file without using the user interface

        :param company_ids:                 List of companys or False for all the PT companies
        :param saft_type:                   C, F, I or S (Accounting, Invoicing, Integrated,
                                            Self-Billing). Default is F
        :param period:                      One of month_last, month_curr, quarter_last,
                                            quarter_curr, year_last, year_curr
        :param date_from:                   First day of the extraction period. If empty it will be
                                            computed from the period parameter
        :param date_to:                     Last day of the extraction period. If empty it will be
                                            computed from the period parameter
        :param self_billing_partner_ids:    List of self-billing partners or False for all partners
                                            (when saft_type == "S")
        :param send_email:                  Boolean that determines whether an email should be sent
                                            or not.
        :param email_recipients:            A list of email addresses to which the email will be sent.
                                            (when saft_type != "S")
        :param invoices:                    Boolean that determines if invoices should be sent
                                            or not. (when saft_type == "S")

        :return:                            A list of dictionaries containing:
                                            'company_id':       The id of the company
                                            'filename' :        The name of the SAF-T file
                                            'file_obj':         The file object of the SAF-T in base64 format
                                            'error':            A boolean indicating if there are any errors
                                            'dataport_log_id':  The recordset of the dataport

        """

        pt_companys = self.search([("pt_invoicing", "=",  True)])

        if not date_from or not date_to:
            date_from, date_to = self.l10n_pt_set_date(period)

        if company_ids:
            result = self.browse(company_ids)._l10n_pt_get_company_saft(
                saft_type, date_from, date_to, self_billing_partner_ids
            )
        else:
            result = pt_companys._l10n_pt_get_company_saft(
                saft_type, date_from, date_to, self_billing_partner_ids
            )

        if send_email:
            for values in result:
                if saft_type == "S":
                    dataport_log = self.env["dataport.log"].browse(values.get("dataport_log_id"))
                    self.l10n_pt_mass_mailing_handler(content=values, partner_id=dataport_log.self_billing_partner_id, invoice=invoices, saft_type=saft_type)
                else:
                    self.l10n_pt_mass_mailing_handler(content=values, email_recipients=email_recipients, invoice=invoices, saft_type=saft_type)

        for res in result:
            _logger.info("Output filename: {}".format(res["filename"]))

        return result

    def _l10n_pt_get_company_saft(self, saft_type="F", date_from=None, date_to=None, self_billing_partner_ids=None):
        """
            Generates SAF-T files for each company based on the specified parameters.

            :param saft_type:                   Type of SAF-T file to generate
            :param date_from:                   The starting date for the data to be included in the SAF-T file.
            :param date_to:                     The ending date for the data to be included in the SAF-T file.
            :param self_billing_partner_ids:    List of partner IDs for self-billing, if applicable.

            :return: A list of dictionaries containing information about the generated SAF-T files.
        """
        result = []
        for company in self:
            if not company.pt_invoicing:
                _logger.info(
                    f"Skipping company {company.name}: pt invoicing is not active")
                continue

            _logger.info(f"Exporting SAF-T file for company {company.name}")
            saft_wizs = company.env["dataport.export.saft"]

            if saft_type == "S":
                partner_ids = self._get_self_billing_partner(self_billing_partner_ids,
                                                             company.id)
                partner_invoices = self.env["account.move"].search([
                    ('partner_id', 'in', partner_ids.ids),
                    ('state', '=', 'posted'),
                    ('invoice_date', '>=', date_from),
                    ('invoice_date', '<=', date_to),
                    ('move_type', '=', 'in_invoice')
                ]).mapped('partner_id')

                for partner_id in partner_invoices:
                    saft_wiz = saft_wizs.create({
                        "company_id": company.id,
                        "statement_type": "saf-t",
                        "type": saft_type,
                        "date_to": date_to,
                        "date_from": date_from,
                        "l10n_pt_self_billing_partner_id": partner_id.id
                    })
                    saft_wizs += saft_wiz
            else:
                saft_wizs = saft_wizs.create({
                    "company_id": company.id,
                    "statement_type": "saf-t",
                    "type": saft_type,
                    "date_to": date_to,
                    "date_from": date_from,
                })

            for saft_wiz in saft_wizs:
                saft_wiz.export_xml()

                dataport = saft_wiz.create_log_record()
                if saft_type == "S":
                    dataport.write({
                        "self_billing_partner_id": partner_id.id
                    })
                result.append({
                    "company_id": company.id,
                    "filename": saft_wiz.filename,
                    "file_obj": saft_wiz.data,
                    "dataport_log_id": dataport.id,
                    "error": saft_wiz.error,
                })

            _logger.info(f"SAF-T extraction for company {company.name} completed")
        return result
    def l10n_pt_mass_mailing_handler(self, content=None, partner_id=False,
                                     email_recipients=[], invoice=False, saft_type="F"):
        """
            Handles mass mailing for SAF-T documents. Depending on the SAFT type,
            it sends emails to either a list of partners or a list of email recipients
            with relevant attachments.


            :param content:             Content of the email including filename and file_obj.
            :param partner_id:         Partner ID to whom the email will be sent. Default is False.
            :param email_recipients:    List of email recipients if SAF-T type != 'S'.
            :param invoice:             Flag to determine if invoice attachments should be included when
                                        SAF-T type == 'S'.
            :param saft_type:           Type of SAF-T, 'S' for sending to partners from partner_ids, any
                                        other value for sending to email_recipients.
        """

        def create_attachments(partner, content, invoice):
            """
                Creates email attachments for partners. This includes both general attachments and specific invoice
                attachments if the invoice flag is set.

                :param partner:     The partner for whom the attachments are being created.
                :param content:     Content of the email including filename and file_obj.
                :param invoice:     Flag to determine if invoice attachments should be included.

                :return:            A list of dictionaries, each representing an attachment with its name and data.
            """
            attachments = [{'attachment_name': content.get('filename'),
                            'attachment_data': content.get('file_obj')}]
            if invoice and saft_type == "S":
                partner_invoices = self.env["account.move"].search([
                    ('partner_id', '=', partner.id),
                    ('state', '=', 'posted'),
                    ('invoice_date', '>=', content.get('dataport_log_id').date_from),
                    ('invoice_date', '<=', content.get('dataport_log_id').date_to),
                    ('move_type', '=', 'in_invoice')
                ])
                for inv in partner_invoices:
                    if not inv.message_main_attachment_id:
                        pdf_content, _ = self.env[
                            "ir.actions.report"].sudo()._render_qweb_pdf(
                            "account.account_invoices", res_ids=inv.id)
                        attachments.append({
                            'attachment_name': inv._get_report_base_filename(),
                            'attachment_data': base64.b64encode(pdf_content)
                        })
                    else:
                        attachments.append({
                            'attachment_name': inv.message_main_attachment_id.name,
                            'attachment_data': inv.message_main_attachment_id.datas
                        })
            return attachments

        def create_and_send_email(email_to, res_id, attachments, email_fields):
            """
                Creates and sends an email to a specified recipient using a template.
                The email includes attachments and is customized based on the provided
                model ID and email fields.

                :param email_to:            Char of email address's of the recipient separated with comma.
                :param res_id:              The ID of the record instance that email is related to.
                :param attachments:         A list of attachments to include in the email.
                :param email_fields:        List of fields to include in the email body.
            """
            template_id = self.env.ref('ptplus_saft.email_template_partner_saft')
            email_values = template_id.generate_email(res_id, email_fields)
            email_values.update({
                "email_from": self.env.company.email_formatted,
                "email_to": email_to,
                "model": "res.partner",
                "res_id": res_id,
                "attachments": attachments
            })

            # Create attachment records
            attachment_ids = []
            for attachment in attachments:
                attachment_record = self.env["ir.attachment"].create({
                    "name": attachment.get("attachment_name"),
                    "type": "binary",
                    "datas": attachment.get("attachment_data"),
                    "res_model": "res.partner",
                    "res_id": res_id,
                })
                attachment_ids.append(attachment_record.id)

            # Send the email with the specified layout and attachments
            if template_id:
                template_id.send_mail(
                    res_id,
                    force_send=True,
                    email_layout_xmlid='mail.mail_notification_light',
                    email_values={
                        'attachment_ids': attachment_ids,
                        'email_to': email_to or False
                    },
                )

        # Main logic to handle different types of mass mailing
        if saft_type == "S" and partner_id:
            attachments = create_attachments(partner_id, content, invoice)
            create_and_send_email(partner_id.email, partner_id.id, attachments,
                                  ["body_html"])
        else:
            attachments = create_attachments(None, content, False)
            create_and_send_email(", ".join(email_recipients), self.env.company.partner_id.id,
                                  attachments, ["body_html"])

    def _get_self_billing_partner(self, self_billing_partner_ids, company_id):
        """
        Retrieves partners eligible for self-billing. If specific partner IDs are provided,
        it filters those partners based on the company_id. Otherwise, it searches all partners
        who are marked for self-billing in the specified company.

        :param self_billing_partner_ids: List of partner IDs to consider, or None to search all.
        :param company_id: The ID of the company to filter the partners by.
        :return: Recordset of partners matching the criteria.

        """
        if self_billing_partner_ids:
            partners = self.env["res.partner"].browse(
                self_billing_partner_ids).filtered_domain([
                    '|',
                    ('company_id', '=', company_id),
                    ('company_id', '=', False)
                ])
        else:
            partners = self.env["res.partner"].search([
                ('l10n_pt_self_billing', '=', True),
                '|',
                ('company_id', '=', company_id),
                ('company_id', '=', False)
            ])
        return partners

    def l10n_pt_set_date(self, period):
        today = fields.Date.from_string(fields.Date.context_today(self))
        if period.startswith("month"):
            period_start = date(today.year, today.month, 1)
            if period == "month_last":
                period_start -= relativedelta(months=1)
            delta = relativedelta(months=1, days=-1)
        elif period.startswith("quarter"):
            quarter = (today.month - 1) // 3
            if period == "quarter_last":
                quarter -= 1
            month_start = 3 * quarter + 1
            period_start = date(today.year, month_start, 1)
            delta = relativedelta(months=3, days=-1)
        elif period.startswith("year"):
            period_start = date(today.year, 1, 1)
            if period == "year_last":
                period_start -= relativedelta(years=1)
            delta = relativedelta(years=1, days=-1)

        date_from = period_start
        date_to = period_start + delta

        return date_from, date_to


Leave a Comment