Merge branch 'ai/feat/detection' into 'ai/develop'

Feat: detection/train 프로젝트 명세 변경에 따른 수정

See merge request s11-s-project/S11P21S002!211
This commit is contained in:
김용수 2024-09-27 11:23:42 +09:00
commit a93825ea8f
11 changed files with 180 additions and 161 deletions

View File

@ -5,8 +5,7 @@ from schemas.predict_response import PredictResponse, LabelData
from schemas.train_report_data import ReportData
from services.load_model import load_classification_model
from services.create_model import save_model
from utils.dataset_utils import split_data
from utils.file_utils import get_dataset_root_path, process_directories, process_image_and_label, join_path
from utils.file_utils import get_dataset_root_path, process_directories_in_cls, process_image_and_label_in_cls, join_path
from utils.slackMessage import send_slack_message
from utils.api_utils import send_data_call_api
import random
@ -17,7 +16,7 @@ router = APIRouter()
@router.post("/predict")
async def classification_predict(request: PredictRequest):
send_slack_message(f"predict 요청: {request}", status="success")
send_slack_message(f"cls predict 요청: {request}", status="success")
# 모델 로드
model = get_model(request)
@ -61,17 +60,16 @@ def process_prediction_result(result, image, label_map):
try:
label_data = LabelData(
version="0.0.0",
task_type="det",
task_type="cls",
shapes=[
{
"label": summary['name'],
"color": get_random_color(),
"points": [
[summary['box']['x1'], summary['box']['y1']],
[summary['box']['x2'], summary['box']['y2']]
[0, 0]
],
"group_id": label_map[summary['class']] if label_map else summary['class'],
"shape_type": "rectangle",
"shape_type": "point",
"flags": {}
}
for summary in result.summary()
@ -96,16 +94,9 @@ def get_random_color():
@router.post("/train")
async def classification_train(request: TrainRequest, http_request: Request):
async def classification_train(request: TrainRequest):
send_slack_message(f"train 요청{request}", status="success")
# Authorization 헤더에서 Bearer 토큰 추출
auth_header = http_request.headers.get("Authorization")
token = auth_header.split(" ")[1] if auth_header and auth_header.startswith("Bearer ") else None
# 레이블 맵
inverted_label_map = {value: key for key, value in request.label_map.items()} if request.label_map else None
send_slack_message(f"cls train 요청{request}", status="success")
# 데이터셋 루트 경로 얻기
dataset_root_path = get_dataset_root_path(request.project_id)
@ -117,10 +108,10 @@ async def classification_train(request: TrainRequest, http_request: Request):
model_categories = model.names
# 데이터 전처리
preprocess_dataset(dataset_root_path, model_categories, request.data, request.ratio, inverted_label_map)
preprocess_dataset(dataset_root_path, model_categories, request.data, request.ratio)
# 학습
results = run_train(request,token,model,dataset_root_path)
results = run_train(request,model,dataset_root_path)
# best 모델 저장
model_key = save_model(project_id=request.project_id, path=join_path(dataset_root_path, "result", "weights", "best.pt"))
@ -132,30 +123,30 @@ async def classification_train(request: TrainRequest, http_request: Request):
return response
def preprocess_dataset(dataset_root_path, model_categories, data, ratio, label_map):
def preprocess_dataset(dataset_root_path, model_categories, data, ratio):
try:
# 디렉토리 생성 및 초기화
process_directories(dataset_root_path, model_categories)
process_directories_in_cls(dataset_root_path, model_categories)
# 학습 데이터 분류
train_data, val_data = split_data(data, ratio)
if not train_data or not val_data:
train_data, test_data = split_data(data, ratio)
if not train_data or not test_data:
raise HTTPException(status_code=400, detail="data split exception: data size is too small or \"ratio\" has invalid value")
# 학습 데이터 처리
for data in train_data:
process_image_and_label(data, dataset_root_path, "train", label_map)
process_image_and_label_in_cls(data, dataset_root_path, "train")
# 검증 데이터 처리
for data in val_data:
process_image_and_label(data, dataset_root_path, "val", label_map)
for data in test_data:
process_image_and_label_in_cls(data, dataset_root_path, "test")
except HTTPException as e:
raise e # HTTP 예외를 다시 발생
except Exception as e:
raise HTTPException(status_code=500, detail="preprocess dataset exception: " + str(e))
def run_train(request, token, model, dataset_root_path):
def run_train(request, model, dataset_root_path):
try:
# 데이터 전송 콜백함수
def send_data(trainer):
@ -171,17 +162,17 @@ def run_train(request, token, model, dataset_root_path):
# 로스 box_loss, cls_loss, dfl_loss
loss = trainer.label_loss_items(loss_items=trainer.loss_items)
data = ReportData(
epoch=trainer.epoch, # 현재 에포크
total_epochs=trainer.epochs, # 전체 에포크
box_loss=loss["train/box_loss"], # box loss
cls_loss=loss["train/cls_loss"], # cls loss
dfl_loss=loss["train/dfl_loss"], # dfl loss
fitness=trainer.fitness, # 적합도
epoch_time=trainer.epoch_time, # 지난 에포크 걸린 시간 (에포크 시작 기준으로 결정)
left_seconds=left_seconds # 남은 시간(초)
epoch=trainer.epoch, # 현재 에포크
total_epochs=trainer.epochs, # 전체 에포크
box_loss=0, # box loss
cls_loss=loss["train/loss"], # cls loss
dfl_loss=0, # dfl loss
fitness=trainer.fitness, # 적합도
epoch_time=trainer.epoch_time, # 지난 에포크 걸린 시간 (에포크 시작 기준으로 결정)
left_seconds=left_seconds # 남은 시간(초)
)
# 데이터 전송
send_data_call_api(request.project_id, request.m_id, data, token)
send_data_call_api(request.project_id, request.m_id, data)
except Exception as e:
raise HTTPException(status_code=500, detail=f"send_data exception: {e}")
@ -189,19 +180,15 @@ def run_train(request, token, model, dataset_root_path):
model.add_callback("on_train_epoch_start", send_data)
# 학습 실행
try:
results = model.train(
data=join_path(dataset_root_path, "dataset.yaml"),
name=join_path(dataset_root_path, "result"),
epochs=request.epochs,
batch=request.batch,
lr0=request.lr0,
lrf=request.lrf,
optimizer=request.optimizer
)
except Exception as e:
raise HTTPException(status_code=500, detail=f"model train exception: {e}")
results = model.train(
data=dataset_root_path,
name=join_path(dataset_root_path, "result"),
epochs=request.epochs,
batch=request.batch,
lr0=request.lr0,
lrf=request.lrf,
optimizer=request.optimizer
)
# 마지막 에포크 전송
model.trainer.epoch += 1
send_data(model.trainer)

View File

@ -1,13 +1,12 @@
from fastapi import APIRouter, HTTPException, Request
from fastapi import APIRouter, HTTPException
from schemas.predict_request import PredictRequest
from schemas.train_request import TrainRequest
from schemas.predict_response import PredictResponse, LabelData
from schemas.train_request import TrainRequest, TrainDataInfo
from schemas.predict_response import PredictResponse, LabelData, Shape
from schemas.train_report_data import ReportData
from schemas.train_response import TrainResponse
from services.load_model import load_detection_model
from services.create_model import save_model
from utils.dataset_utils import split_data
from utils.file_utils import get_dataset_root_path, process_directories, process_image_and_label, join_path
from utils.file_utils import get_dataset_root_path, process_directories, join_path, process_image_and_label
from utils.slackMessage import send_slack_message
from utils.api_utils import send_data_call_api
import random
@ -21,14 +20,14 @@ async def detection_predict(request: PredictRequest):
send_slack_message(f"predict 요청: {request}", status="success")
# 모델 로드
model = get_model(request)
# 모델 레이블 카테고리 연결
classes = list(request.label_map) if request.label_map else None
model = get_model(request.project_id, request.m_key)
# 이미지 데이터 정리
url_list = list(map(lambda x:x.image_url, request.image_list))
# 이 값을 모델에 입력하면 해당하는 클래스 id만 출력됨
classes = get_classes(request.label_map, model.names)
# 추론
results = run_predictions(model, url_list, request, classes)
@ -38,11 +37,18 @@ async def detection_predict(request: PredictRequest):
return response
# 모델 로드
def get_model(request: PredictRequest):
def get_model(project_id, model_key):
try:
return load_detection_model(request.project_id, request.m_key)
return load_detection_model(project_id, model_key)
except Exception as e:
raise HTTPException(status_code=500, detail="load model exception: " + str(e))
raise HTTPException(status_code=500, detail="exception in get_model(): " + str(e))
# 모델의 레이블로부터 label_map의 key에 존재하는 값의 id만 가져오기
def get_classes(label_map:dict[str: int], model_names: dict[int, str]):
try:
return [id for id, name in model_names.items() if name in label_map]
except Exception as e:
raise HTTPException(status_code=500, detail="exception in get_classes(): " + str(e))
# 추론 실행 함수
def run_predictions(model, image, request, classes):
@ -54,7 +60,7 @@ def run_predictions(model, image, request, classes):
classes=classes
)
except Exception as e:
raise HTTPException(status_code=500, detail="model predict exception: " + str(e))
raise HTTPException(status_code=500, detail="exception in run_predictions: " + str(e))
# 추론 결과 처리 함수
@ -64,17 +70,17 @@ def process_prediction_result(result, image, label_map):
version="0.0.0",
task_type="det",
shapes=[
{
"label": summary['name'],
"color": get_random_color(),
"points": [
Shape(
label= summary['name'],
color= get_random_color(),
points= [
[summary['box']['x1'], summary['box']['y1']],
[summary['box']['x2'], summary['box']['y2']]
],
"group_id": label_map[summary['class']] if label_map else summary['class'],
"shape_type": "rectangle",
"flags": {}
}
group_id= label_map[summary['name']],
shape_type= "rectangle",
flags= {}
)
for summary in result.summary()
],
split="none",
@ -82,8 +88,10 @@ def process_prediction_result(result, image, label_map):
imageWidth=result.orig_img.shape[1],
imageDepth=result.orig_img.shape[2]
)
except KeyError as e:
raise HTTPException(status_code=500, detail="KeyError: " + str(e))
except Exception as e:
raise HTTPException(status_code=500, detail="model predict exception: " + str(e))
raise HTTPException(status_code=500, detail="exception in process_prediction_result(): " + str(e))
return PredictResponse(
image_id=image.image_id,
@ -94,78 +102,68 @@ def get_random_color():
random_number = random.randint(0, 0xFFFFFF)
return f"#{random_number:06X}"
@router.post("/train")
async def detection_train(request: TrainRequest):
send_slack_message(f"train 요청{request}", status="success")
# Authorization 헤더에서 Bearer 토큰 추출
# 데이터셋 루트 경로 얻기 (프로젝트 id 기반)
dataset_root_path = get_dataset_root_path(request.project_id)
# 모델 로드
model = get_model(request)
# 이 값을 학습할때 넣으면 이 카테고리들이 학습됨
names = list(request.label_map)
# 데이터 전처리: 학습할 디렉토리 & 데이터셋 를 생성
process_directories(dataset_root_path, names)
# 데이터 전처리: 데이터를 학습데이터와 검증데이터로 분류
train_data, val_data = split_data(request.data, request.ratio)
# 데이터 전처리: 데이터 이미지 및 레이블 다운로드
download_data(train_data, val_data, dataset_root_path, request.label_map)
# 학습
results = run_train(request, model,dataset_root_path)
# best 모델 저장
model_key = save_model(project_id=request.project_id, path=join_path(dataset_root_path, "result", "weights", "best.pt"))
result = results.results_dict
response = TrainResponse(
modelKey=model_key,
precision= result["metrics/precision(B)"],
recall= result["metrics/recall(B)"],
mAP50= result["metrics/mAP50(B)"],
mAP5095= result["metrics/mAP50-95(B)"],
fitness= result["fitness"]
)
send_slack_message(f"train 성공{response}", status="success")
return response
def split_data(data:list[TrainDataInfo], ratio:float):
try:
# 레이블 맵
inverted_label_map = {value: key for key, value in request.label_map.items()} if request.label_map else None
# 데이터셋 루트 경로 얻기
dataset_root_path = get_dataset_root_path(request.project_id)
# 모델 로드
model = get_model(request)
# 학습할 모델 카테고리, 카테고리가 추가되는 경우 추가 작업 필요
model_categories = model.names
# 데이터 전처리
preprocess_dataset(dataset_root_path, model_categories, request.data, request.ratio, inverted_label_map)
# 학습
results = run_train(request, model,dataset_root_path)
# best 모델 저장
model_key = save_model(project_id=request.project_id, path=join_path(dataset_root_path, "result", "weights", "best.pt"))
result = results.results_dict
response = TrainResponse(
modelKey=model_key,
precision= result["metrics/precision(B)"],
recall= result["metrics/recall(B)"],
mAP50= result["metrics/mAP50(B)"],
mAP5095= result["metrics/mAP50-95(B)"],
fitness= result["fitness"]
)
send_slack_message(f"train 성공{response}", status="success")
return response
except HTTPException as e:
raise e
train_size = int(ratio * len(data))
random.shuffle(data)
train_data = data[:train_size]
val_data = data[train_size:]
return train_data, val_data
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
raise HTTPException(status_code=500, detail="exception in split_data(): " + str(e))
def preprocess_dataset(dataset_root_path, model_categories, data, ratio, label_map):
def download_data(train_data:list[TrainDataInfo], val_data:list[TrainDataInfo], dataset_root_path:str):
try:
# 디렉토리 생성 및 초기화
process_directories(dataset_root_path, model_categories)
# 학습 데이터 분류
train_data, val_data = split_data(data, ratio)
if not train_data or not val_data:
raise HTTPException(status_code=400, detail="data split exception: data size is too small or \"ratio\" has invalid value")
# 학습 데이터 처리
for data in train_data:
process_image_and_label(data, dataset_root_path, "train", label_map)
process_image_and_label(data, dataset_root_path, "train")
# 검증 데이터 처리
for data in val_data:
process_image_and_label(data, dataset_root_path, "val", label_map)
except HTTPException as e:
raise e # HTTP 예외를 다시 발생
process_image_and_label(data, dataset_root_path, "val")
except Exception as e:
raise HTTPException(status_code=500, detail="preprocess dataset exception: " + str(e))
raise HTTPException(status_code=500, detail="exception in download_data(): " + str(e))
def run_train(request, model, dataset_root_path):
try:
@ -195,7 +193,7 @@ def run_train(request, model, dataset_root_path):
# 데이터 전송
send_data_call_api(request.project_id, request.m_id, data)
except Exception as e:
raise HTTPException(status_code=500, detail=f"send_data exception: {e}")
raise HTTPException(status_code=500, detail=f"exception in send_data(): {e}")
# 콜백 등록
model.add_callback("on_train_epoch_start", send_data)
@ -220,6 +218,6 @@ def run_train(request, model, dataset_root_path):
except HTTPException as e:
raise e # HTTP 예외를 다시 발생
except Exception as e:
raise HTTPException(status_code=500, detail=f"run_train exception: {e}")
raise HTTPException(status_code=500, detail=f"exception in run_train(): {e}")

View File

@ -10,7 +10,7 @@ router = APIRouter()
@router.get("/info/projects/{project_id}/models/{model_key}", summary= "모델 관련 정보 반환")
def get_model_info(project_id:int, model_key:str):
model_path = join_path("resources","projects",project_id, "models", model_key)
model_path = join_path("resources","projects", str(project_id), "models", model_key)
try:
model = load_model(model_path=model_path)
except FileNotFoundError:
@ -32,9 +32,9 @@ def get_model_list(project_id:int):
@router.post("/projects/{project_id}", status_code=201)
def create_model(project_id: int, request: ModelCreateRequest):
if request.project_type not in ["segmentation", "detection", "classfication"]:
if request.project_type not in ["segmentation", "detection", "classification"]:
raise HTTPException(status_code=400,
detail= f"Invalid type '{request.type}'. Must be one of \"segmentation\", \"detection\", \"classfication\".")
detail= f"Invalid type '{request.type}'. Must be one of \"segmentation\", \"detection\", \"classification\".")
model_key = create_new_model(project_id, request.project_type, request.pretrained)
return {"model_key": model_key}

View File

@ -6,7 +6,6 @@ from schemas.train_report_data import ReportData
from schemas.train_response import TrainResponse
from services.load_model import load_segmentation_model
from services.create_model import save_model
from utils.dataset_utils import split_data
from utils.file_utils import get_dataset_root_path, process_directories, process_image_and_label, join_path
from utils.slackMessage import send_slack_message
from utils.api_utils import send_data_call_api

View File

@ -4,6 +4,7 @@ from fastapi.exceptions import RequestValidationError
from starlette.exceptions import HTTPException
from api.yolo.detection import router as yolo_detection_router
from api.yolo.segmentation import router as yolo_segmentation_router
from api.yolo.classfication import router as yolo_classification_router
from api.yolo.model import router as yolo_model_router
from utils.slackMessage import send_slack_message
@ -12,6 +13,7 @@ app = FastAPI()
# 각 기능별 라우터를 애플리케이션에 등록
app.include_router(yolo_detection_router, prefix="/api/detection", tags=["Detection"])
app.include_router(yolo_segmentation_router, prefix="/api/segmentation", tags=["Segmentation"])
app.include_router(yolo_classification_router, prefix="/api/classification", tags=["Classification"])
app.include_router(yolo_model_router, prefix="/api/model", tags=["Model"])

View File

@ -8,8 +8,8 @@ class ImageInfo(BaseModel):
class PredictRequest(BaseModel):
project_id: int
m_key: str = Field("yolo8", alias="model_key")
label_map: dict[int, int] = Field(None, description="모델 레이블 카테고리 idx: 프로젝트 레이블 카테고리 idx , None 일경우 모델 레이블 카테고리 idx로 레이블링")
image_list: list[ImageInfo]
conf_threshold: float = 0.25
m_key: str = Field("yolo8", alias="model_key") # model_ 로 시작하는 변수를 BaseModel의 변수로 만들경우 Warning 떠서 m_key로 대체
label_map: dict[str, int] = Field(..., description="프로젝트 레이블 이름: 프로젝트 레이블 pk , None일 경우 모델 레이블 카테고리 idx로 레이블링")
image_list: list[ImageInfo] # 이미지 리스트
conf_threshold: float = 0.25 #
iou_threshold: float = 0.45

View File

@ -10,7 +10,7 @@ class TrainRequest(BaseModel):
project_id: int
m_key: str = Field("yolo8", alias="model_key")
m_id: int = Field(..., alias="model_id") # 학습 중 에포크 결과를 보낼때 model_id를 보냄
label_map: dict[int, int] = Field({}, description="모델 레이블 카테고리 idx: 프로젝트 레이블 카테고리 idx , None 일경우 레이블 데이터(프로젝트 레이블)의 idx로 학습")
label_map: dict[str, int] = Field(..., description="프로젝트 레이블 이름: 프로젝트 레이블 pk , None일 경우 모델 레이블 카테고리 idx로 레이블링")
data: List[TrainDataInfo]
ratio: float = 0.8 # 훈련/검증 분할 비율

View File

@ -5,8 +5,9 @@ from services.load_model import load_model
def create_new_model(project_id: int, type:str, pretrained:bool):
suffix = ""
if type in ["seg", "cls"]:
suffix = "-"+type
type_list = {"segmentation": "seg", "classification": "cls"}
if type in type_list:
suffix = "-"+type_list[type]
# 학습된 기본 모델 로드
if pretrained:
suffix += ".pt"

View File

@ -10,7 +10,7 @@ def load_detection_model(project_id:int, model_key:str):
if model_key in default_model_map:
model = YOLO(default_model_map[model_key])
else:
model = load_model(model_path=os.path.join("projects",str(project_id),"models", model_key))
model = load_model(model_path=os.path.join("resources", "projects",str(project_id),"models", model_key))
# Detection 모델인지 검증
if model.task != "detect":
@ -23,13 +23,26 @@ def load_segmentation_model(project_id:int, model_key:str):
if model_key in default_model_map:
model = YOLO(default_model_map[model_key])
else:
model = load_model(model_path=os.path.join("projects",str(project_id),"models",model_key))
model = load_model(model_path=os.path.join("resources", "projects",str(project_id),"models",model_key))
# Segmentation 모델인지 검증
if model.task != "segment":
raise TypeError(f"Invalid model type: {model.task}. Expected a SegmentationModel.")
return model
def load_classification_model(project_id:int, model_key:str):
default_model_map = {"yolo8": os.path.join("resources","models","yolov8n-cls.pt")}
# 디폴트 모델 확인
if model_key in default_model_map:
model = YOLO(default_model_map[model_key])
else:
model = load_model(model_path=os.path.join("resources", "projects",str(project_id),"models",model_key))
# Segmentation 모델인지 검증
if model.task != "classify":
raise TypeError(f"Invalid model type: {model.task}. Expected a ClassificationModel.")
return model
def load_model(model_path: str):
if not os.path.exists(model_path):
raise FileNotFoundError(f"Model file not found at path: {model_path}")

View File

@ -1,8 +0,0 @@
import random
def split_data(data:list, ratio:float):
train_size = int(ratio * len(data))
random.shuffle(data)
train_data = data[:train_size]
val_data = data[train_size:]
return train_data, val_data

View File

@ -24,7 +24,7 @@ def make_yml(path:str, model_categories):
data = {
"train": f"{path}/train",
"val": f"{path}/val",
"nc": 80,
"nc": len(model_categories),
"names": model_categories
}
with open(os.path.join(path, "dataset.yaml"), 'w') as f:
@ -39,7 +39,7 @@ def process_directories(dataset_root_path:str, model_categories:list[str]):
shutil.rmtree(os.path.join(dataset_root_path, "result"))
make_yml(dataset_root_path, model_categories)
def process_image_and_label(data:TrainDataInfo, dataset_root_path:str, child_path:str, label_map:dict[int, int]|None):
def process_image_and_label(data:TrainDataInfo, dataset_root_path:str, child_path:str):
"""이미지 저장 및 레이블 파일 생성"""
# 이미지 url로부터 파일명 분리
img_name = data.image_url.split('/')[-1]
@ -60,11 +60,11 @@ def process_image_and_label(data:TrainDataInfo, dataset_root_path:str, child_pat
# 레이블 -> 학습용 레이블 데이터 파싱 후 생성
if label['task_type'] == "det":
create_detection_train_label(label, label_path, label_map)
create_detection_train_label(label, label_path)
elif label["task_type"] == "seg":
create_segmentation_train_label(label, label_path, label_map)
create_segmentation_train_label(label, label_path)
def create_detection_train_label(label:dict, label_path:str, label_map:dict[int, int]|None):
def create_detection_train_label(label:dict, label_path:str):
with open(label_path, "w") as train_label_txt:
for shape in label["shapes"]:
train_label = []
@ -72,18 +72,18 @@ def create_detection_train_label(label:dict, label_path:str, label_map:dict[int,
y1 = shape["points"][0][1]
x2 = shape["points"][1][0]
y2 = shape["points"][1][1]
train_label.append(str(label_map[shape["group_id"]]) if label_map else str(shape["group_id"])) # label Id
train_label.append(str(shape["group_id"])) # label Id
train_label.append(str((x1 + x2) / 2 / label["imageWidth"])) # 중심 x 좌표
train_label.append(str((y1 + y2) / 2 / label["imageHeight"])) # 중심 y 좌표
train_label.append(str((x2 - x1) / label["imageWidth"])) # 너비
train_label.append(str((y2 - y1) / label["imageHeight"] )) # 높이
train_label_txt.write(" ".join(train_label)+"\n")
def create_segmentation_train_label(label:dict, label_path:str, label_map:dict[int, int]|None):
def create_segmentation_train_label(label:dict, label_path:str):
with open(label_path, "w") as train_label_txt:
for shape in label["shapes"]:
train_label = []
train_label.append(str(label_map[shape["group_id"]]) if label_map else str(shape["group_id"])) # label Id
train_label.append(str(shape["group_id"])) # label Id
for x, y in shape["points"]:
train_label.append(str(x / label["imageWidth"]))
train_label.append(str(y / label["imageHeight"]))
@ -117,3 +117,30 @@ def get_file_name(path):
if not os.path.exists(path):
raise FileNotFoundError()
return os.path.basename(path)
def process_directories_in_cls(dataset_root_path:str, model_categories:dict[int,str]):
"""classification 학습을 위한 디렉토리 생성"""
make_dir(dataset_root_path, init=False)
for category in model_categories.values():
make_dir(os.path.join(dataset_root_path, "train", category), init=True)
make_dir(os.path.join(dataset_root_path, "test", category), init=True)
if os.path.exists(os.path.join(dataset_root_path, "result")):
shutil.rmtree(os.path.join(dataset_root_path, "result"))
def process_image_and_label_in_cls(data:TrainDataInfo, dataset_root_path:str, child_path:str):
"""이미지 저장 및 레이블 파일 생성"""
# 이미지 url로부터 파일명 분리
img_name = data.image_url.split('/')[-1]
# 레이블 객체 불러오기
label = json.loads(urllib.request.urlopen(data.data_url).read())
label_name = label["shapes"][0]["label"]
label_path = os.path.join(dataset_root_path,child_path,label_name)
# url로부터 이미지 다운로드
urllib.request.urlretrieve(data.image_url, os.path.join(label_path, img_name))
def download_image(url, path):
urllib.request.urlretrieve(url, path)