Untitled
unknown
plain_text
10 months ago
3.1 kB
3
Indexable
from fastapi import FastAPI, HTTPException
from sqlalchemy import create_engine, Column, Integer, String, Float, ForeignKey
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker, relationship
# Database setup
DATABASE_URL = "sqlite:///./billing.db"
engine = create_engine(DATABASE_URL)
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
Base = declarative_base()
# Models
class Customer(Base):
__tablename__ = "customers"
id = Column(Integer, primary_key=True, index=True)
name = Column(String, index=True)
email = Column(String, unique=True, index=True)
invoices = relationship("Invoice", back_populates="customer")
class Invoice(Base):
__tablename__ = "invoices"
id = Column(Integer, primary_key=True, index=True)
amount = Column(Float)
status = Column(String, default="pending")
customer_id = Column(Integer, ForeignKey("customers.id"))
customer = relationship("Customer", back_populates="invoices")
# Create tables
Base.metadata.create_all(bind=engine)
# FastAPI app
app = FastAPI()
# Dependency to get database session
def get_db():
db = SessionLocal()
try:
yield db
finally:
db.close()
# Routes
@app.post("/customers/")
def create_customer(name: str, email: str, db: SessionLocal = next(get_db())):
if db.query(Customer).filter(Customer.email == email).first():
raise HTTPException(status_code=400, detail="Email already exists")
new_customer = Customer(name=name, email=email)
db.add(new_customer)
db.commit()
db.refresh(new_customer)
return new_customer
@app.post("/invoices/")
def create_invoice(customer_id: int, amount: float, db: SessionLocal = next(get_db())):
customer = db.query(Customer).filter(Customer.id == customer_id).first()
if not customer:
raise HTTPException(status_code=404, detail="Customer not found")
invoice = Invoice(customer_id=customer_id, amount=amount)
db.add(invoice)
db.commit()
db.refresh(invoice)
return invoice
@app.get("/invoices/{invoice_id}")
def get_invoice(invoice_id: int, db: SessionLocal = next(get_db())):
invoice = db.query(Invoice).filter(Invoice.id == invoice_id).first()
if not invoice:
raise HTTPException(status_code=404, detail="Invoice not found")
return invoice
@app.post("/payments/")
def process_payment(invoice_id: int, payment_method: str, db: SessionLocal = next(get_db())):
invoice = db.query(Invoice).filter(Invoice.id == invoice_id).first()
if not invoice:
raise HTTPException(status_code=404, detail="Invoice not found")
if invoice.status != "pending":
raise HTTPException(status_code=400, detail="Invoice already paid or cancelled")
# Mock payment processing
if payment_method not in ["credit_card", "paypal"]:
raise HTTPException(status_code=400, detail="Unsupported payment method")
invoice.status = "paid"
db.commit()
db.refresh(invoice)
return {"message": "Payment successful", "invoice": invoice}
Editor is loading...
Leave a Comment