Untitled
from fastapi import FastAPI, HTTPException from sqlalchemy import create_engine, Column, Integer, String, Float, ForeignKey from sqlalchemy.ext.declarative import declarative_base from sqlalchemy.orm import sessionmaker, relationship # Database setup DATABASE_URL = "sqlite:///./billing.db" engine = create_engine(DATABASE_URL) SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine) Base = declarative_base() # Models class Customer(Base): __tablename__ = "customers" id = Column(Integer, primary_key=True, index=True) name = Column(String, index=True) email = Column(String, unique=True, index=True) invoices = relationship("Invoice", back_populates="customer") class Invoice(Base): __tablename__ = "invoices" id = Column(Integer, primary_key=True, index=True) amount = Column(Float) status = Column(String, default="pending") customer_id = Column(Integer, ForeignKey("customers.id")) customer = relationship("Customer", back_populates="invoices") # Create tables Base.metadata.create_all(bind=engine) # FastAPI app app = FastAPI() # Dependency to get database session def get_db(): db = SessionLocal() try: yield db finally: db.close() # Routes @app.post("/customers/") def create_customer(name: str, email: str, db: SessionLocal = next(get_db())): if db.query(Customer).filter(Customer.email == email).first(): raise HTTPException(status_code=400, detail="Email already exists") new_customer = Customer(name=name, email=email) db.add(new_customer) db.commit() db.refresh(new_customer) return new_customer @app.post("/invoices/") def create_invoice(customer_id: int, amount: float, db: SessionLocal = next(get_db())): customer = db.query(Customer).filter(Customer.id == customer_id).first() if not customer: raise HTTPException(status_code=404, detail="Customer not found") invoice = Invoice(customer_id=customer_id, amount=amount) db.add(invoice) db.commit() db.refresh(invoice) return invoice @app.get("/invoices/{invoice_id}") def get_invoice(invoice_id: int, db: SessionLocal = next(get_db())): invoice = db.query(Invoice).filter(Invoice.id == invoice_id).first() if not invoice: raise HTTPException(status_code=404, detail="Invoice not found") return invoice @app.post("/payments/") def process_payment(invoice_id: int, payment_method: str, db: SessionLocal = next(get_db())): invoice = db.query(Invoice).filter(Invoice.id == invoice_id).first() if not invoice: raise HTTPException(status_code=404, detail="Invoice not found") if invoice.status != "pending": raise HTTPException(status_code=400, detail="Invoice already paid or cancelled") # Mock payment processing if payment_method not in ["credit_card", "paypal"]: raise HTTPException(status_code=400, detail="Unsupported payment method") invoice.status = "paid" db.commit() db.refresh(invoice) return {"message": "Payment successful", "invoice": invoice}
Leave a Comment