카테고리 없음
fast api & mongodb 연동
다만사
2023. 7. 15. 21:56
car.py
_id를 제거한 버전
from fastapi import FastAPI, Body, HTTPException, status
from typing import Dict, List
from decouple import config
from fastapi.encoders import jsonable_encoder
from fastapi.responses import JSONResponse
from motor.motor_asyncio import AsyncIOMotorClient
from bson import ObjectId
DB_URL = config('DB_URL', cast=str)
DB_NAME = config('DB_NAME', cast=str)
app = FastAPI()
@app.on_event("startup")
async def startup_db_client():
app.mongodb_client = AsyncIOMotorClient(DB_URL)
app.mongodb = app.mongodb_client[DB_NAME]
@app.on_event("shutdown")
async def shutdown_db_client():
app.mongodb_client.close()
@app.get("/car/{id}")
async def root(id:str):
# if (car := await app.mongodb["cars"].find_one({"_id": ObjectId(id)})) is not None:
# return car
# raise HTTPException(status_code=404, detail=f"Car with {id} not found")
car = await app.mongodb["cars"].find_one({"_id": ObjectId(id)}, {"_id":0})
# print(car)
return car
@app.get("/cars/price")
async def cars_by_price(min_price:int=0, max_price:int=100000):
list = app.mongodb["cars"].find({
"$and": [{"price":{"$gte": min_price}},{"price":{"$lte": max_price}}]
}, {"_id":0})
result = [car async for car in list]
return result
@app.post("/cars")
async def new_car(car: Dict = Body(...)):
print(car)
car = jsonable_encoder(car)
new_car = await app.mongodb["cars"].insert_one(car)
print(new_car)
print(new_car.inserted_id)
created_car = await app.mongodb["cars"].find_one({"_id": ObjectId(new_car.inserted_id)}, {"_id":0})
#return {"message": "new_car created"}
return JSONResponse(status_code=status.HTTP_201_CREATED, content=created_car)