############################################################################## # # Copyright (C) 2016 Exo Software, Lda. (<https://exosoftware.pt>) # ############################################################################## import base64 import json import logging from datetime import date, timedelta, datetime from dateutil.relativedelta import relativedelta from odoo import api, models, fields _logger = logging.getLogger(__name__) # pylint: disable=C0103 class ResCompany(models.Model): _inherit = "res.company" @api.model def l10n_pt_get_saft( self, company_ids=[], saft_type="F", period="month_last", date_from=None, date_to=None, self_billing_partner_ids=[], send_email=False, email_recipients=[], invoices=False ): """ A method to obtain a SAF-T PT file without using the user interface :param company_ids: List of companys or False for all the PT companies :param saft_type: C, F, I or S (Accounting, Invoicing, Integrated, Self-Billing). Default is F :param period: One of month_last, month_curr, quarter_last, quarter_curr, year_last, year_curr :param date_from: First day of the extraction period. If empty it will be computed from the period parameter :param date_to: Last day of the extraction period. If empty it will be computed from the period parameter :param self_billing_partner_ids: List of self-billing partners or False for all partners (when saft_type == "S") :param send_email: Boolean that determines whether an email should be sent or not. :param email_recipients: A list of email addresses to which the email will be sent. (when saft_type != "S") :param invoices: Boolean that determines if invoices should be sent or not. (when saft_type == "S") :return: A list of dictionaries containing: 'company_id': The id of the company 'filename' : The name of the SAF-T file 'file_obj': The file object of the SAF-T in base64 format 'error': A boolean indicating if there are any errors 'dataport_log_id': The recordset of the dataport """ pt_companys = self.search([("pt_invoicing", "=", True)]) if not date_from or not date_to: date_from, date_to = self.l10n_pt_set_date(period) if company_ids: result = self.browse(company_ids)._l10n_pt_get_company_saft( saft_type, date_from, date_to, self_billing_partner_ids ) else: result = pt_companys._l10n_pt_get_company_saft( saft_type, date_from, date_to, self_billing_partner_ids ) if send_email: for values in result: if saft_type == "S": dataport_log = self.env["dataport.log"].browse(values.get("dataport_log_id")) self.l10n_pt_mass_mailing_handler(content=values, partner_id=dataport_log.self_billing_partner_id, invoice=invoices, saft_type=saft_type) else: self.l10n_pt_mass_mailing_handler(content=values, email_recipients=email_recipients, invoice=invoices, saft_type=saft_type) for res in result: _logger.info("Output filename: {}".format(res["filename"])) return result def _l10n_pt_get_company_saft(self, saft_type="F", date_from=None, date_to=None, self_billing_partner_ids=None): """ Generates SAF-T files for each company based on the specified parameters. :param saft_type: Type of SAF-T file to generate :param date_from: The starting date for the data to be included in the SAF-T file. :param date_to: The ending date for the data to be included in the SAF-T file. :param self_billing_partner_ids: List of partner IDs for self-billing, if applicable. :return: A list of dictionaries containing information about the generated SAF-T files. """ result = [] for company in self: if not company.pt_invoicing: _logger.info( f"Skipping company {company.name}: pt invoicing is not active") continue _logger.info(f"Exporting SAF-T file for company {company.name}") saft_wizs = company.env["dataport.export.saft"] if saft_type == "S": partner_ids = self._get_self_billing_partner(self_billing_partner_ids, company.id) partner_invoices = self.env["account.move"].search([ ('partner_id', 'in', partner_ids.ids), ('state', '=', 'posted'), ('invoice_date', '>=', date_from), ('invoice_date', '<=', date_to), ('move_type', '=', 'in_invoice') ]).mapped('partner_id') for partner_id in partner_invoices: saft_wiz = saft_wizs.create({ "company_id": company.id, "statement_type": "saf-t", "type": saft_type, "date_to": date_to, "date_from": date_from, "l10n_pt_self_billing_partner_id": partner_id.id }) saft_wizs += saft_wiz else: saft_wizs = saft_wizs.create({ "company_id": company.id, "statement_type": "saf-t", "type": saft_type, "date_to": date_to, "date_from": date_from, }) for saft_wiz in saft_wizs: saft_wiz.export_xml() dataport = saft_wiz.create_log_record() if saft_type == "S": dataport.write({ "self_billing_partner_id": partner_id.id }) result.append({ "company_id": company.id, "filename": saft_wiz.filename, "file_obj": saft_wiz.data, "dataport_log_id": dataport.id, "error": saft_wiz.error, }) _logger.info(f"SAF-T extraction for company {company.name} completed") return result def l10n_pt_mass_mailing_handler(self, content=None, partner_id=False, email_recipients=[], invoice=False, saft_type="F"): """ Handles mass mailing for SAF-T documents. Depending on the SAFT type, it sends emails to either a list of partners or a list of email recipients with relevant attachments. :param content: Content of the email including filename and file_obj. :param partner_id: Partner ID to whom the email will be sent. Default is False. :param email_recipients: List of email recipients if SAF-T type != 'S'. :param invoice: Flag to determine if invoice attachments should be included when SAF-T type == 'S'. :param saft_type: Type of SAF-T, 'S' for sending to partners from partner_ids, any other value for sending to email_recipients. """ def create_attachments(partner, content, invoice): """ Creates email attachments for partners. This includes both general attachments and specific invoice attachments if the invoice flag is set. :param partner: The partner for whom the attachments are being created. :param content: Content of the email including filename and file_obj. :param invoice: Flag to determine if invoice attachments should be included. :return: A list of dictionaries, each representing an attachment with its name and data. """ attachments = [{'attachment_name': content.get('filename'), 'attachment_data': content.get('file_obj')}] if invoice and saft_type == "S": partner_invoices = self.env["account.move"].search([ ('partner_id', '=', partner.id), ('state', '=', 'posted'), ('invoice_date', '>=', content.get('dataport_log_id').date_from), ('invoice_date', '<=', content.get('dataport_log_id').date_to), ('move_type', '=', 'in_invoice') ]) for inv in partner_invoices: if not inv.message_main_attachment_id: pdf_content, _ = self.env[ "ir.actions.report"].sudo()._render_qweb_pdf( "account.account_invoices", res_ids=inv.id) attachments.append({ 'attachment_name': inv._get_report_base_filename(), 'attachment_data': base64.b64encode(pdf_content) }) else: attachments.append({ 'attachment_name': inv.message_main_attachment_id.name, 'attachment_data': inv.message_main_attachment_id.datas }) return attachments def create_and_send_email(email_to, res_id, attachments, email_fields): """ Creates and sends an email to a specified recipient using a template. The email includes attachments and is customized based on the provided model ID and email fields. :param email_to: Char of email address's of the recipient separated with comma. :param res_id: The ID of the record instance that email is related to. :param attachments: A list of attachments to include in the email. :param email_fields: List of fields to include in the email body. """ template_id = self.env.ref('ptplus_saft.email_template_partner_saft') email_values = template_id.generate_email(res_id, email_fields) email_values.update({ "email_from": self.env.company.email_formatted, "email_to": email_to, "model": "res.partner", "res_id": res_id, "attachments": attachments }) # Create attachment records attachment_ids = [] for attachment in attachments: attachment_record = self.env["ir.attachment"].create({ "name": attachment.get("attachment_name"), "type": "binary", "datas": attachment.get("attachment_data"), "res_model": "res.partner", "res_id": res_id, }) attachment_ids.append(attachment_record.id) # Send the email with the specified layout and attachments if template_id: template_id.send_mail( res_id, force_send=True, email_layout_xmlid='mail.mail_notification_light', email_values={ 'attachment_ids': attachment_ids, 'email_to': email_to or False }, ) # Main logic to handle different types of mass mailing if saft_type == "S" and partner_id: attachments = create_attachments(partner_id, content, invoice) create_and_send_email(partner_id.email, partner_id.id, attachments, ["body_html"]) else: attachments = create_attachments(None, content, False) create_and_send_email(", ".join(email_recipients), self.env.company.partner_id.id, attachments, ["body_html"]) def _get_self_billing_partner(self, self_billing_partner_ids, company_id): """ Retrieves partners eligible for self-billing. If specific partner IDs are provided, it filters those partners based on the company_id. Otherwise, it searches all partners who are marked for self-billing in the specified company. :param self_billing_partner_ids: List of partner IDs to consider, or None to search all. :param company_id: The ID of the company to filter the partners by. :return: Recordset of partners matching the criteria. """ if self_billing_partner_ids: partners = self.env["res.partner"].browse( self_billing_partner_ids).filtered_domain([ '|', ('company_id', '=', company_id), ('company_id', '=', False) ]) else: partners = self.env["res.partner"].search([ ('l10n_pt_self_billing', '=', True), '|', ('company_id', '=', company_id), ('company_id', '=', False) ]) return partners def l10n_pt_set_date(self, period): today = fields.Date.from_string(fields.Date.context_today(self)) if period.startswith("month"): period_start = date(today.year, today.month, 1) if period == "month_last": period_start -= relativedelta(months=1) delta = relativedelta(months=1, days=-1) elif period.startswith("quarter"): quarter = (today.month - 1) // 3 if period == "quarter_last": quarter -= 1 month_start = 3 * quarter + 1 period_start = date(today.year, month_start, 1) delta = relativedelta(months=3, days=-1) elif period.startswith("year"): period_start = date(today.year, 1, 1) if period == "year_last": period_start -= relativedelta(years=1) delta = relativedelta(years=1, days=-1) date_from = period_start date_to = period_start + delta return date_from, date_to
