Untitled

 avatar
unknown
plain_text
2 months ago
3.1 kB
1
Indexable
from fastapi import FastAPI, HTTPException
from sqlalchemy import create_engine, Column, Integer, String, Float, ForeignKey
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker, relationship

# Database setup
DATABASE_URL = "sqlite:///./billing.db"
engine = create_engine(DATABASE_URL)
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
Base = declarative_base()

# Models
class Customer(Base):
    __tablename__ = "customers"
    id = Column(Integer, primary_key=True, index=True)
    name = Column(String, index=True)
    email = Column(String, unique=True, index=True)
    invoices = relationship("Invoice", back_populates="customer")

class Invoice(Base):
    __tablename__ = "invoices"
    id = Column(Integer, primary_key=True, index=True)
    amount = Column(Float)
    status = Column(String, default="pending")
    customer_id = Column(Integer, ForeignKey("customers.id"))
    customer = relationship("Customer", back_populates="invoices")

# Create tables
Base.metadata.create_all(bind=engine)

# FastAPI app
app = FastAPI()

# Dependency to get database session
def get_db():
    db = SessionLocal()
    try:
        yield db
    finally:
        db.close()

# Routes
@app.post("/customers/")
def create_customer(name: str, email: str, db: SessionLocal = next(get_db())):
    if db.query(Customer).filter(Customer.email == email).first():
        raise HTTPException(status_code=400, detail="Email already exists")
    new_customer = Customer(name=name, email=email)
    db.add(new_customer)
    db.commit()
    db.refresh(new_customer)
    return new_customer

@app.post("/invoices/")
def create_invoice(customer_id: int, amount: float, db: SessionLocal = next(get_db())):
    customer = db.query(Customer).filter(Customer.id == customer_id).first()
    if not customer:
        raise HTTPException(status_code=404, detail="Customer not found")
    invoice = Invoice(customer_id=customer_id, amount=amount)
    db.add(invoice)
    db.commit()
    db.refresh(invoice)
    return invoice

@app.get("/invoices/{invoice_id}")
def get_invoice(invoice_id: int, db: SessionLocal = next(get_db())):
    invoice = db.query(Invoice).filter(Invoice.id == invoice_id).first()
    if not invoice:
        raise HTTPException(status_code=404, detail="Invoice not found")
    return invoice

@app.post("/payments/")
def process_payment(invoice_id: int, payment_method: str, db: SessionLocal = next(get_db())):
    invoice = db.query(Invoice).filter(Invoice.id == invoice_id).first()
    if not invoice:
        raise HTTPException(status_code=404, detail="Invoice not found")
    if invoice.status != "pending":
        raise HTTPException(status_code=400, detail="Invoice already paid or cancelled")
    
    # Mock payment processing
    if payment_method not in ["credit_card", "paypal"]:
        raise HTTPException(status_code=400, detail="Unsupported payment method")
    
    invoice.status = "paid"
    db.commit()
    db.refresh(invoice)
    return {"message": "Payment successful", "invoice": invoice}
Leave a Comment