A.I./TensorFlow
Keras 3 문서) Introduction to Keras for engineers
채소장사
2024. 9. 19. 18:13
엔지니어를 위한 케라스 소개
- 원문 : Introduction to Keras for engineers
- 케라스는 TensorFlow, JAX, PyTorch 위에서 상호 호환적으로 동작할 수 있는 딥러닝 프레임워크다.
- (구글 코랩에서 실행되는) 이 문서는 Keras 3를 사용할 때의 작업 흐름을 보여준다.
- 참고) 코랩에서 실행한 결과는 다음에서 확인해볼 수 있다.
Setup
import numpy as np
import os
os.environ["KERAS_BACKEND"] = "jax"
import keras
- 이 문서에서는 케라스가 실행되는 백엔드 프레임워크로서 JAX를 사용한다.
"jax"
를"tensorflow"
또는"torch"
로 바꾼 후에 런타임 재실행(Restrat runtime)하면 다른 백엔드를 선택할 수 있다.
- 이처럼 케라스의 코드는 어떤 백엔드 프레임워크를 선택하는지와 상관없이(backend-agnostic) 작성할 수 있다.
- 주의) 케라스를 불러오는(import) 것은 반드시 백엔드 설정이 끝난 후에 이뤄져야 한다.
첫 번째 예제 : MNIST convnet
- 잘 알려진 MNIST는 28x28 크기의 그레이스케일(greyscale) 이미지 60,000장으로 구성되어 있다. 훈련 데이터와 동일하게 10개의 숫자로 구성된 테스트 이미지는 10,000장이며, 자세한 사항은 MNIST 홈페이지를 참고한다.
- MNIST는 0~9 사이의 숫자가 기록된 디지털 이미지 분류를 위한 데이터셋이다.
(x_train, y_train), (x_test, y_test) = keras.datasets.mnist.load_data()
x_train = x_train.astype("float32") / 255
x_test = x_test.astype("float32") / 255
x_train = np.expand_dims(x_train, -1)
x_test = np.expand_dims(x_test, -1)
- 받아진 60,000장의 훈련데이터와 10,000장의 테스트 데이터는 넘파이(NumPy) 배열 형태이다.
- 이 때, 각 데이터는 0부터 255 사이의 정수값(
uint8
)로 되어 있다. - 좀 더 효과적인 학습을 위해서는 디지털 이미지의 픽셀값을 정수에서 0~1 사이의 실수값으로 바꾸고
- 높이(H: height)와 너비(W:width)로만 이뤄진 2차원 행렬을, 채널(C:channel) 차원이 추가된 3차원 행렬(H x W x C) 형태로 변경할 필요가 있다.
- 이제 모델을 정의한다. 케라스가 제공하는 모델 작성 방식은 다음과 같다.
num_classes = 10
input_shape = (28, 28, 1)
model = keras.Sequential(
[
keras.layers.Input(shape=input_shape),
keras.layers.Conv2D(64, kernel_size=(3, 3), activation="relu"),
keras.layers.Conv2D(64, kernel_size=(3, 3), activation="relu"),
keras.layers.MaxPooling2D(pool_size=(2, 2)),
keras.layers.Conv2D(128, kernel_size=(3, 3), activation="relu"),
keras.layers.Conv2D(128, kernel_size=(3, 3), activation="relu"),
keras.layers.GlobalAveragePooling2D(),
keras.layers.Dropout(0.5),
keras.layers.Dense(num_classes, activation="softmax"),
]
)
- Sequential API를 이용하여 모델을 정의할 때 (배치 크기를 제외한) 입력의 크기를 지정해주면, 모델을 빌드(build)하기 전에도 각 레이어의 출력형태, 파라미터 수 등의 정보를 얻을 수 있다.
- 이 것이 위에서 Input 레이어(
keras.layers.Input()
)가 쓰인 이유이며, 자세한 내용은 아래 문서를 참고한다.
- 이 것이 위에서 Input 레이어(
model.summary()
로 정의된 모델 정보를 출력하면 다음과 같다.
compile()
메소드를 사용하여, 옵티마이저(optimizer), 손실함수(loss function), 평가를 위한 메트릭(metrics)을 지정한다.compile()
의jit_compile
옵션에 따라서 XLA 컴파일러 사용여부가 정해진다.- 따로 지정되지 않은 경우, 디폴트 값으로 지정된
auto
에 따라서- JAX와 TensorFlow 백엔드 사용시에는 XLA로 모델을 컴파일하지만
- PyTorch 백엔드 사용시에는 최적화 컴파일링 없이 즉시 실행(eager execution) 모드로 컴파일된다.
- 모델 가속화를 위한 xla(Accelerated Linear Algebra)에 대해서는 OpenXLA 문서를 참고한다.
model.compile(
loss=keras.losses.SparseCategoricalCrossentropy(),
optimizer=keras.optimizers.Adam(learning_rate=1e-3),
metrics=[
keras.metrics.SparseCategoricalAccuracy(name="acc"),
],
)
- 이제 모델을 훈련시키고 평가할 수 있다. 이 문서에서는 훈련 간에 새로운(unseen) 데이터에 대한 일반화(generalization) 능력을 알아보기 위해, (훈련)데이터의 15%를 검증(validation) 데이터로 지정한다.
batch_size = 128
epochs = 20
callbacks = [
keras.callbacks.ModelCheckpoint(filepath="model_at_epoch_{epoch}.keras"),
keras.callbacks.EarlyStopping(monitor="val_loss", patience=2),
]
model.fit(
x_train,
y_train,
batch_size=batch_size,
epochs=epochs,
validation_split=0.15,
callbacks=callbacks,
)
score = model.evaluate(x_test, y_test, verbose=0)
- 위에 사용된
ModelCheckpoint
콜백의 설정에 따라, 매 에포크가 끝날 때 훈련된 모델이 저장된다. - 콜백을 사용하는 대신에, 학습이 끝난 후에 모델을 직접 저장할 수 있다.
model.save("final_model.keras")
- 저장된 모델을 불러와서(load)하여, 사용할 수 있다.
model = keras.saving.load_model("final_model.keras")
predict()
를 이용하면, 각 클래스에 대한 확률과 함께 예측된 결과를 볼 수 있다.
predictions = model.predict(x_test)
프레임워크와 상관없이 동작하는 커스텀 요소 작성
- 케라스를 통해 백엔드 프레임워크의 종류에 상관없이, 개발자의 의도대로 구성된 커스텀 레이어, 모델, 평가 메트릭, 손실함수, 옵티마이저 등을 정의하고 사용할 수 있다.
- 이 때 사용되는
keras.ops
네임스페이스는- NumPy API와 동일한 실행이 정의된 함수들 : 예)
keras.ops.stack
,keras.ops.matmul
- NumPy에서 제공되지 않는 신경망을 위한 연산들 : 예)
keras.ops.conv
,keras.ops.binary_crossentropy
- NumPy API와 동일한 실행이 정의된 함수들 : 예)
- 아래는 모든 백엔드에서 동작사는 커스텀 Dense 레이어를 직접 작성한 경우다.
class MyDense(keras.layers.Layer):
def __init__(self, units, activation=None, name=None):
super().__init__(name=name)
self.units = units
self.activation = keras.activations.get(activation)
def build(self, input_shape):
input_dim = input_shape[-1]
self.w = self.add_weight(
shape=(input_dim, self.units),
initializer=keras.initializers.GlorotNormal(),
name="kernel",
trainable=True,
)
self.b = self.add_weight(
shape=(self.units,),
initializer=keras.initializers.Zeros(),
name="bias",
trainable=True,
)
def call(self, inputs):
x = keras.ops.matmul(inputs, self.w) + self.b
return self.activation(x)
keras.random
네임스페이스의 함수들을 이용하여, 커스텀 Dropout 레이어를 만들 수 있다.
class MyDropout(keras.layers.Layer):
def __init__(self, rate, name=None):
super().__init__(name=name)
self.rate = rate
self.seed_generator = keras.random.SeedGenerator(1337)
def call(self, inputs):
return keras.random.dropout(inputs, self.rate, seed=self.seed_generator)
- 이렇게 정의된 커스텀 요소들을 사용하는 모델을 서브클래스 방식으로 구성할 수 있다.
class MyModel(keras.Model):
def __init__(self, num_classes):
super().__init__()
self.conv_base = keras.Sequential(
[
keras.layers.Conv2D(64, kernel_size=(3, 3), activation="relu"),
keras.layers.Conv2D(64, kernel_size=(3, 3), activation="relu"),
keras.layers.MaxPooling2D(pool_size=(2, 2)),
keras.layers.Conv2D(128, kernel_size=(3, 3), activation="relu"),
keras.layers.Conv2D(128, kernel_size=(3, 3), activation="relu"),
keras.layers.GlobalAveragePooling2D(),
]
)
self.dp = MyDropout(0.5)
self.dense = MyDense(num_classes, activation="softmax")
def call(self, x):
x = self.conv_base(x)
x = self.dp(x)
return self.dense(x)
- 모델을 컴파일하고, 훈련하는 방법은 다르지 않다.
model = MyModel(num_classes=10)
model.compile(
loss=keras.losses.SparseCategoricalCrossentropy(),
optimizer=keras.optimizers.Adam(learning_rate=1e-3),
metrics=[
keras.metrics.SparseCategoricalAccuracy(name="acc"),
],
)
model.fit(
x_train,
y_train,
batch_size=batch_size,
epochs=1, # For speed
validation_split=0.15,
)
임의의 데이터 입력형태에 대해 모델 훈련시키기
- 모든 케라스 모델은 다양한 형식의 데이터 포맷에 대해서 훈련 및 평가를 할 수 있으며, 이 역시 어떠한 백엔드를 사용하는지와 상관없다.
- NumPy 배열
- Pandas 데이터프레임(dataframe)
- 텐서플로의 tf.data.Dataset
- 파이토치의 DataLoader 객체
- 케라스의 PyDataset 객체
- 파이토치의
DataLoaders
를 사용하는 케라스의 방식이다.
import torch
train_torch_dataset = torch.utils.data.TensorDataset(
torch.from_numpy(x_train), torch.from_numpy(y_train)
)
val_torch_dataset = torch.utils.data.TensorDataset(
torch.from_numpy(x_test), torch.from_numpy(y_test)
)
train_dataloader = torch.utils.data.DataLoader(
train_torch_dataset, batch_size=batch_size, shuffle=True
)
val_dataloader = torch.utils.data.DataLoader(
val_torch_dataset, batch_size=batch_size, shuffle=False
)
model = MyModel(num_classes=10)
model.compile(
loss=keras.losses.SparseCategoricalCrossentropy(),
optimizer=keras.optimizers.Adam(learning_rate=1e-3),
metrics=[
keras.metrics.SparseCategoricalAccuracy(name="acc"),
],
)
model.fit(train_dataloader, epochs=1, validation_data=val_dataloader)
- 텐서플로의
tf.data
를 이용하는 경우다.
import tensorflow as tf
train_dataset = (
tf.data.Dataset.from_tensor_slices((x_train, y_train))
.batch(batch_size)
.prefetch(tf.data.AUTOTUNE)
)
test_dataset = (
tf.data.Dataset.from_tensor_slices((x_test, y_test))
.batch(batch_size)
.prefetch(tf.data.AUTOTUNE)
)
model = MyModel(num_classes=10)
model.compile(
loss=keras.losses.SparseCategoricalCrossentropy(),
optimizer=keras.optimizers.Adam(learning_rate=1e-3),
metrics=[
keras.metrics.SparseCategoricalAccuracy(name="acc"),
],
)
model.fit(train_dataset, epochs=1, validation_data=test_dataset)