言語処理100本ノック 2020 第8章を解きました。

言語処理100本ノック 2020の第8章を解いたので、その解法と思ったことをつらつら書きます。

コードはこちら

70. 単語ベクトルの和による特徴量

普段はあまりやらないのですが、dataframeをdictで持つようにし、必要に応じて取り出す形で書きました。

# 70. 単語ベクトルの和による特徴量

import gensim
import numpy as np
import pandas as pd
import texthero as hero


def load_data(data_check=False) -> dict:
    """データの読み込み"""
    # 読み込むファイルを定義
    inputs = {
        "train": "../../ch06/50/train.txt",
        "valid": "../../ch06/50/valid.txt",
        "test": "../../ch06/50/test.txt",
    }

    dfs = {}
    for k, v in inputs.items():
        dfs[k] = pd.read_csv(v, sep="\t", index_col=0)

    # データチェック
    if data_check:
        for k in inputs.keys():
            print(k, "---", dfs[k].shape)
            print(dfs[k].head())

    return dfs


def preprocess(text: str) -> str:
    """前処理"""
    clean_text = hero.clean(
        text,
        pipeline=[
            hero.preprocessing.fillna,
            hero.preprocessing.lowercase,
            hero.preprocessing.remove_digits,
            hero.preprocessing.remove_punctuation,
            hero.preprocessing.remove_diacritics,
            hero.preprocessing.remove_stopwords,
        ],
    )
    return clean_text


def get_mean_vector(model, sentence: list):
    # remove out-of-vocabulary words
    words = [word for word in sentence if word in model.vocab]
    if len(words) >= 1:
        return np.mean(model[words], axis=0)
    else:
        return []


if __name__ == "__main__":

    name_list = ["train", "valid", "test"]

    dfs = load_data()

    model = gensim.models.KeyedVectors.load_word2vec_format(
        "../../ch07/60/GoogleNews-vectors-negative300.bin", binary=True
    )

    # 前処理
    for name in name_list:
        dfs[name]["TITLE"] = dfs[name][["TITLE"]].apply(preprocess)

    # 分かち書き
    for name in name_list:
        dfs[name]["TITLE_SPLIT"] = [
            text.split(" ") for text in dfs[name]["TITLE"].tolist()
        ]

    # 特徴量行列の取得
    for name in name_list:
        dfs[name]["TITLE_VECTOR"] = [
            get_mean_vector(model, text) for text in dfs[name]["TITLE_SPLIT"].tolist()
        ]

    # ラベル変換
    label_dict = {"b": 0, "t": 1, "e": 2, "m": 3}
    for name in name_list:
        dfs[name]["CATEGORY"] = dfs[name]["CATEGORY"].map(label_dict)

    # データの保存
    for name in name_list:
        # 特徴量行列
        np.save(f"{name}_vector", np.stack(dfs[name]["TITLE_VECTOR"]))
        # ラベル
        np.save(f"{name}_label", np.stack(dfs[name]["CATEGORY"]))

71. 単層ニューラルネットワークによる予測

matmulを使って内積を直接計算しています。bias無しのnn.Linearの方が実用的かと思いますが、理解のため。

# 71. 単層ニューラルネットワークによる予測

import numpy as np
import torch
from torch import nn

if __name__ == "__main__":

    # Xの読み込み
    X = np.load("../70/train_vector.npy")
    X = torch.tensor(X, requires_grad=True)

    # Wの生成
    W = torch.randn(300, 4)

    # XとWの内積
    XW = torch.matmul(X, W)

    # 行方向のsoftmaxの演算
    m = nn.Softmax(dim=1)
    output = m(XW)

    print(output)
    # tensor([[0.2043, 0.3957, 0.2756, 0.1245],
    #     [0.1173, 0.2283, 0.5908, 0.0636],
    #     [0.0679, 0.4003, 0.4292, 0.1027],
    #     ...,
    #     [0.2458, 0.4192, 0.2750, 0.0599],
    #     [0.0954, 0.6650, 0.1910, 0.0486],
    #     [0.0869, 0.3767, 0.2606, 0.2759]])

    torch.save(output, "71.pt")

72. 損失と勾配の計算

output = loss(X, Y)の計算後、backward()を行うと、Xに関する勾配の計算をpytorchが裏で勝手にやってくれます。
X.gradで計算された勾配が確認できます。

# 72. 損失と勾配の計算

import numpy as np
import torch
from torch import nn

if __name__ == "__main__":

    # 事例集合x1,x2,x3,x4の読み込み
    X = torch.load("../71/71.pt")

    # 正解データの読み込み
    Y = np.load("../70/train_label.npy")
    Y = torch.from_numpy(Y)

    # lossの計算
    loss = nn.CrossEntropyLoss()
    output = loss(X, Y)

    print(output)
    # tensor(1.3074, grad_fn=<NllLossBackward>)

    # 勾配の計算
    output.backward()  # Xはrequired_gradがTrueになっているため、outputの計算元であるXに関する勾配を裏で計算している
    print(X.grad)
    # tensor([[-7.1343e-05,  3.2402e-05,  1.8923e-05,  2.0019e-05],
    #     [-7.0948e-05,  1.8294e-05,  2.2431e-05,  3.0223e-05],
    #     [-7.6297e-05,  1.6931e-05,  1.7497e-05,  4.1869e-05],
    #     ...,
    #     [-6.5094e-05,  2.0229e-05,  2.3246e-05,  2.1619e-05],
    #     [-7.1032e-05,  3.0666e-05,  1.9752e-05,  2.0614e-05],
    #     [-7.1319e-05,  1.8884e-05,  1.9923e-05,  3.2512e-05]])

73. 確率的勾配降下法による学習

state_dict()["weight"]でmodelのweightを取得できます。 optimizer.step()でstate_dict()["weight"]が更新されます。

# 73. 確率的勾配降下法による学習

import numpy as np
import torch
from torch import nn

if __name__ == "__main__":

    # 学習データの読み込み
    X = np.load("../70/train_vector.npy")
    X = torch.tensor(X, requires_grad=True)
    # torch.Size([10672, 300])

    # 正解データの読み込み
    y = np.load("../70/train_label.npy")
    y = torch.from_numpy(y)
    # torch.Size([10672])

    # modelの設定
    # y=xA^T + b
    # 今回の例の場合、
    # x:torch.Size([10672, 300])
    # A:torch.Size([4, 300]) これはnet.parameters().__next__().size()によって求まる
    # A^T:torch.Size([300, 4])
    # よって、
    # xA^T:torch.Size([10672, 4])
    net = nn.Linear(300, 4)

    # loss, optimizerの設定
    loss = nn.CrossEntropyLoss()
    optimizer = torch.optim.SGD(net.parameters(), lr=0.01)

    # parameterの更新
    print("Before")
    print(net.state_dict()["weight"])
    # torch.Size([4, 300])
    # tensor([[ 0.0508, -0.0503,  0.0018,  ..., -0.0246, -0.0344,  0.0418],
    #         [-0.0468,  0.0374, -0.0137,  ...,  0.0034, -0.0196,  0.0232],
    #         [-0.0145,  0.0146, -0.0559,  ..., -0.0218,  0.0179,  0.0148],
    #         [-0.0107, -0.0198,  0.0443,  ...,  0.0069, -0.0069, -0.0463]])
    # それぞれの特徴ベクトルが、どの正解データっぽいかをなんとなく表現
    # 例えば、label0っぽさに対する重み。
    # 0.0508は,xの0番目の特徴ベクトルに対してこの数字をかける。
    # 正解labelが0のとき、0番目の特徴ベクトルはどういう数字になるかを考慮してこの重みは決まる。

    for step in range(100):
        optimizer.zero_grad()

        y_pred = net(X)

        output = loss(y_pred, y)
        output.backward()

        optimizer.step()

    print("After")
    print(net.state_dict()["weight"])
    # tensor([[ 0.0461, -0.0476, -0.0025,  ..., -0.0167, -0.0249,  0.0266],
    #     [-0.0437,  0.0316, -0.0109,  ...,  0.0063, -0.0224,  0.0204],
    #     [-0.0132,  0.0228, -0.0573,  ..., -0.0390,  0.0105,  0.0317],
    #     [-0.0104, -0.0249,  0.0471,  ...,  0.0133, -0.0063, -0.0451]])

    net_path = "73_net.pth"
    torch.save(net.state_dict(), net_path)

74. 正解率の計測

# 74. 正解率の計測

import numpy as np
import torch
from torch import nn

if __name__ == "__main__":

    # modelの設定
    net = nn.Linear(300, 4)

    net_path = "../73/73_net.pth"
    net.load_state_dict(torch.load(net_path))

    # 学習データの読み込み
    train_x = torch.tensor(np.load("../70/train_vector.npy"), requires_grad=True)

    train_y = torch.tensor(np.load("../70/train_label.npy"))

    # 評価データの読み込み
    test_x = torch.tensor(np.load("../70/test_vector.npy"), requires_grad=True)

    test_y = torch.tensor(np.load("../70/test_label.npy"))

    # 学習データに対する予測
    train_pred_prob = net(train_x)
    _, train_pred = torch.max(train_pred_prob, 1)

    # 学習データに対する正解率の計算
    train_correct_num = (train_pred == train_y).sum().item()
    train_size = train_y.size(0)
    train_acc = (train_correct_num / train_size) * 100
    print(f"train acc:{train_acc: .2f}%")
    # train acc: 72.84%

    # 評価データに対する予測
    test_pred_prob = net(test_x)
    _, test_pred = torch.max(test_pred_prob, 1)

    # 評価データに対する正解率の計算
    test_correct_num = (test_pred == test_y).sum().item()
    test_size = test_y.size(0)
    test_acc = (test_correct_num / test_size) * 100
    print(f"test acc:{test_acc: .2f}%")
    # test acc: 68.29%

75. 損失と正解率のプロット

素直に損失と正解率を計算、描画します。

# 75. 損失と正解率のプロット

import matplotlib.pyplot as plt
import numpy as np
import torch
from torch import nn


def calc_acc(y_pred_prob, y_true) -> float:
    """予測のtensorの正解のtensorを用いて、正解率を計算する"""
    # 最も正解率の高い予測確率を正解ラベルとする。
    _, y_pred = torch.max(y_pred_prob, 1)

    # 学習データに対する正解率の計算
    correct_num = (y_pred == y_true).sum().item()
    total_size = y_true.size(0)
    acc = (correct_num / total_size) * 100
    return acc


def make_graph(value_dict: dict, value_name: str) -> None:
    """value_dictに関するgraphを生成し、保存する。"""
    for phase in ["train", "valid"]:
        plt.plot(value_dict[phase], label=phase)
    plt.xlabel("epoch")
    plt.ylabel(value_name)
    plt.title(f"{value_name} per epoch")
    plt.legend()
    plt.savefig(f"{value_name}.png")
    plt.close()


if __name__ == "__main__":

    # 学習データの読み込み
    train_x = torch.tensor(np.load("../70/train_vector.npy"), requires_grad=True)

    train_y = torch.tensor(np.load("../70/train_label.npy"))

    # 検証データの読み込み
    valid_x = torch.tensor(np.load("../70/valid_vector.npy"), requires_grad=True)

    valid_y = torch.tensor(np.load("../70/valid_label.npy"))

    # modelの設定
    net = nn.Linear(300, 4)

    # loss, optimizerの設定
    loss = nn.CrossEntropyLoss()
    optimizer = torch.optim.SGD(net.parameters(), lr=0.01)

    train_losses = []
    train_accs = []
    valid_losses = []
    valid_accs = []

    # parameterの更新
    for epoch in range(100):
        optimizer.zero_grad()

        train_y_pred_prob = net(train_x)

        # 訓練データでの損失の計算
        train_loss = loss(train_y_pred_prob, train_y)
        train_loss.backward()

        optimizer.step()

        # 訓練データでの損失の保存
        train_losses.append(train_loss.data)

        # 訓練データでの正解率の計算
        train_acc = calc_acc(train_y_pred_prob, train_y)
        # 訓練データでの正解率の保存
        train_accs.append(train_acc)

        # 検証データに対する予測
        valid_y_pred_prob = net(valid_x)

        # 検証データの損失の計算
        valid_loss = loss(valid_y_pred_prob, valid_y)
        # 検証データでの損失の保存
        valid_losses.append(valid_loss.data)

        # 検証データでの正解率の計算
        valid_acc = calc_acc(valid_y_pred_prob, valid_y)
        # 検証データでの正解率の保存
        valid_accs.append(valid_acc)

    # グラフへのプロット
    losses = {"train": train_losses, "valid": valid_losses}

    accs = {"train": train_accs, "valid": valid_accs}

    make_graph(losses, "losses")
    make_graph(accs, "accs")

グラフは下記のようになりました。

f:id:sinchir0:20210801161403p:plain
losses
f:id:sinchir0:20210801161418p:plain
accs

76. チェックポイント

20epoch毎にチェックポイントを設けます。

# 76. チェックポイント

import matplotlib.pyplot as plt
import numpy as np
import torch
from torch import nn


def calc_acc(y_pred_prob, y_true) -> float:
    """予測のtensorの正解のtensorを用いて、正解率を計算する"""
    # 最も正解率の高い予測確率を正解ラベルとする。
    _, y_pred = torch.max(y_pred_prob, 1)

    # 学習データに対する正解率の計算
    correct_num = (y_pred == y_true).sum().item()
    total_size = y_true.size(0)
    acc = (correct_num / total_size) * 100
    return acc


if __name__ == "__main__":

    # 学習データの読み込み
    train_x = torch.tensor(np.load("../70/train_vector.npy"), requires_grad=True)

    train_y = torch.tensor(np.load("../70/train_label.npy"))

    # 検証データの読み込み
    valid_x = torch.tensor(np.load("../70/valid_vector.npy"), requires_grad=True)

    valid_y = torch.tensor(np.load("../70/valid_label.npy"))

    # modelの設定
    net = nn.Linear(300, 4)

    # loss, optimizerの設定
    loss = nn.CrossEntropyLoss()
    optimizer = torch.optim.SGD(net.parameters(), lr=0.01)

    train_losses = []
    train_accs = []
    valid_losses = []
    valid_accs = []

    # parameterの更新
    for epoch in range(100):
        optimizer.zero_grad()

        train_y_pred_prob = net(train_x)

        # 訓練データでの損失の計算
        train_loss = loss(train_y_pred_prob, train_y)
        train_loss.backward()

        optimizer.step()

        # 訓練データでの損失の保存
        train_losses.append(train_loss.data)

        # 訓練データでの正解率の計算
        train_acc = calc_acc(train_y_pred_prob, train_y)
        # 訓練データでの正解率の保存
        train_accs.append(train_acc)

        # 検証データに対する予測
        valid_y_pred_prob = net(valid_x)

        # 検証データの損失の計算
        valid_loss = loss(valid_y_pred_prob, valid_y)
        # 検証データでの損失の保存
        valid_losses.append(valid_loss.data)

        # 検証データでの正解率の計算
        valid_acc = calc_acc(valid_y_pred_prob, valid_y)
        # 検証データでの正解率の保存
        valid_accs.append(valid_acc)

        # 20epoch毎にチェックポイントを生成
        if epoch % 20 == 0:
            torch.save(net.state_dict(), f"76_net_epoch{epoch}.pth")
            torch.save(optimizer.state_dict(), f"76_optimizer_epoch{epoch}.pth")

77. ミニバッチ化

pytorchのdataloaderを用いてミニバッチ化を行います。
batchsizeの増加に伴って、計算時間が短くなっていくことも確認できました。

# 77.ミニバッチ化

import time

import matplotlib.pyplot as plt
import numpy as np
import torch
from torch import nn
from torch.utils.data import DataLoader, Dataset


class TextDataset(Dataset):
    def __init__(self, X, y):
        self.X = X
        self.y = y

    def __len__(self):
        return len(self.X)

    def __getitem__(self, idx):
        return self.X[idx], self.y[idx]


class Net(nn.Module):
    def __init__(self, in_shape: int, out_shape: int):
        super().__init__()
        self.fc = nn.Linear(300, 4, bias=True)
        self.softmax = nn.Softmax(dim=1)

    def forward(self, x):
        x = self.fc(x)
        x = self.softmax(x)
        return x


def calc_acc(net, train_x, y_true) -> float:
    """modelと学習データ、正解データを用いて、正解率を計算する"""
    # 最も正解率の高い予測確率を正解ラベルとする。
    _, y_pred = torch.max(net(train_x), 1)

    # 学習データに対する正解率の計算
    correct_num = (y_pred == y_true).sum().item()
    total_size = y_true.size(0)
    acc = (correct_num / total_size) * 100
    return acc


if __name__ == "__main__":

    # 学習データの読み込み
    train_x = torch.tensor(np.load("../70/train_vector.npy"), requires_grad=True)
    train_y = torch.tensor(np.load("../70/train_label.npy"))

    # 評価データの読み込み
    valid_x = torch.tensor(np.load("../70/valid_vector.npy"), requires_grad=True)
    valid_y = torch.tensor(np.load("../70/valid_label.npy"))

    # modelの設定
    net = Net(in_shape=train_x.shape[1], out_shape=4)

    # loss, optimizerの設定
    loss = nn.CrossEntropyLoss()
    optimizer = torch.optim.SGD(net.parameters(), lr=0.01)

    # DataLoaderの構築
    dataset = TextDataset(train_x, train_y)

    # parameterの更新
    batchsizes = [1, 2, 4, 8, 16, 32, 64, 128]
    for batchsize in batchsizes:
        loader = DataLoader(dataset, batch_size=batchsize, shuffle=True)

        train_losses = []
        train_accs = []
        valid_losses = []
        valid_accs = []

        for epoch in range(1):

            start = time.time()

            train_running_loss = 0.0
            valid_running_loss = 0.0

            for dataloader_x, dataloader_y in loader:
                """netの重みの学習をbatchsize単位で行う"""

                optimizer.zero_grad()

                dataloader_y_pred_prob = net(dataloader_x)

                # dataset_xでの損失の計算
                dataloader_loss = loss(dataloader_y_pred_prob, dataloader_y)
                dataloader_loss.backward()

                # 訓練データ、検証データでの損失の平均を計算する
                train_running_loss += dataloader_loss.item()
                valid_running_loss += loss(net(valid_x), valid_y).item()

                optimizer.step()

            # 訓練データでの損失の保存
            train_losses.append(train_running_loss)

            # 訓練データでの正解率の計算
            train_acc = calc_acc(net, train_x, train_y)
            # 訓練データでの正解率の保存
            train_accs.append(train_acc)

            # 検証データでの損失の保存
            valid_losses.append(valid_running_loss)

            # 検証データでの正解率の計算
            valid_acc = calc_acc(net, valid_x, valid_y)
            # 検証データでの正解率の保存
            valid_accs.append(valid_acc)

            # 20epoch毎にチェックポイントを生成
            if epoch % 20 == 0:
                torch.save(net.state_dict(), f"77_net_bs{batchsize}_epoch{epoch}.pth")
                torch.save(
                    optimizer.state_dict(),
                    f"77_optimizer_bs{batchsize}_epoch{epoch}.pth",
                )

            # 経過した時間を取得
            elapsed_time = time.time() - start
            print(f"batchsize{batchsize} time:{elapsed_time: .2f}")

            # batchsize1 time: 20.91
            # batchsize2 time: 26.53
            # batchsize4 time: 21.08
            # batchsize8 time: 18.27
            # batchsize16 time: 16.90
            # batchsize32 time: 16.23
            # batchsize64 time: 15.85
            # batchsize128 time: 15.68

78. GPU上での学習

train,valid, modelをdevice送りにします。
GPU上での動作はColabで確認。
GPUだと露骨に高速化されることも確認できます。

# 78. GPU上での学習
# 問題77のコードを改変し,GPU上で学習を実行せよ.

# Colabのコード
# https://colab.research.google.com/drive/1XCvdoUAJCnow-tmZkKnhqPxOVoa4-Tak?usp=sharing

import time

import matplotlib.pyplot as plt
import numpy as np
import torch
from torch import nn
from torch.utils.data import DataLoader, Dataset


class TextDataset(Dataset):
    def __init__(self, X, y):
        self.X = X
        self.y = y

    def __len__(self):
        return len(self.X)

    def __getitem__(self, idx):
        return self.X[idx], self.y[idx]


class Net(nn.Module):
    def __init__(self, in_shape: int, out_shape: int):
        super().__init__()
        self.fc = nn.Linear(300, 4, bias=True)
        self.softmax = nn.Softmax(dim=1)

    def forward(self, x):
        x = self.fc(x)
        x = self.softmax(x)
        return x


def calc_acc(net, train_x, y_true) -> float:
    """modelと学習データ、正解データを用いて、正解率を計算する"""
    # 最も正解率の高い予測確率を正解ラベルとする。
    _, y_pred = torch.max(net(train_x), 1)

    # 学習データに対する正解率の計算
    correct_num = (y_pred == y_true).sum().item()
    total_size = y_true.size(0)
    acc = (correct_num / total_size) * 100
    return acc


if __name__ == "__main__":

    if not torch.cuda.is_available():
        print("No cuda")

    PATH = ".."

    device = (
        torch.device("cuda:0") if torch.cuda.is_available() else torch.device("cpu")
    )

    # 学習データの読み込み
    train_x = torch.tensor(
        np.load(f"{PATH}/70/train_vector.npy"), requires_grad=True
    ).to(device)
    train_y = torch.tensor(np.load(f"{PATH}/70/train_label.npy")).to(device)

    # 評価データの読み込み
    valid_x = torch.tensor(
        np.load(f"{PATH}/70/valid_vector.npy"), requires_grad=True
    ).to(device)
    valid_y = torch.tensor(np.load(f"{PATH}/70/valid_label.npy")).to(device)

    # modelの設定
    net = Net(in_shape=train_x.shape[1], out_shape=4).to(device)

    # loss, optimizerの設定
    loss = nn.CrossEntropyLoss()
    optimizer = torch.optim.SGD(net.parameters(), lr=0.01)

    # DataLoaderの構築
    dataset = TextDataset(train_x, train_y)

    # parameterの更新
    batchsizes = [1, 2, 4, 8, 16, 32, 64, 128]
    for batchsize in batchsizes:
        loader = DataLoader(dataset, batch_size=batchsize, shuffle=True)

        train_losses = []
        train_accs = []
        valid_losses = []
        valid_accs = []

        for epoch in range(1):

            start = time.time()

            train_running_loss = 0.0
            valid_running_loss = 0.0

            for dataloader_x, dataloader_y in loader:
                """netの重みの学習をbatchsize単位で行う"""

                optimizer.zero_grad()

                dataloader_y_pred_prob = net(dataloader_x)

                # dataset_xでの損失の計算
                dataloader_loss = loss(dataloader_y_pred_prob, dataloader_y)
                dataloader_loss.backward()

                # 訓練データ、検証データでの損失の平均を計算する
                train_running_loss += dataloader_loss.item()
                valid_running_loss += loss(net(valid_x), valid_y).item()

                optimizer.step()

            # 訓練データでの損失の保存
            train_losses.append(train_running_loss)

            # 訓練データでの正解率の計算
            train_acc = calc_acc(net, train_x, train_y)
            # 訓練データでの正解率の保存
            train_accs.append(train_acc)

            # 検証データでの損失の保存
            valid_losses.append(valid_running_loss)

            # 検証データでの正解率の計算
            valid_acc = calc_acc(net, valid_x, valid_y)
            # 検証データでの正解率の保存
            valid_accs.append(valid_acc)

            # 20epoch毎にチェックポイントを生成
            if epoch % 20 == 0:
                torch.save(net.state_dict(), f"77_net_bs{batchsize}_epoch{epoch}.pth")
                torch.save(
                    optimizer.state_dict(),
                    f"77_optimizer_bs{batchsize}_epoch{epoch}.pth",
                )

            # 経過した時間を取得
            elapsed_time = time.time() - start
            print(f"batchsize{batchsize} time:{elapsed_time: .2f}")

            # CPUのみ
            # batchsize1 time: 20.91
            # batchsize2 time: 26.53
            # batchsize4 time: 21.08
            # batchsize8 time: 18.27
            # batchsize16 time: 16.90
            # batchsize32 time: 16.23
            # batchsize64 time: 15.85
            # batchsize128 time: 15.68

            # GPU
            # batchsize1 time: 11.17
            # batchsize2 time: 6.64
            # batchsize4 time: 4.56
            # batchsize8 time: 3.49
            # batchsize16 time: 2.94
            # batchsize32 time: 2.68
            # batchsize64 time: 2.55
            # batchsize128 time: 2.48

79. 多層ニューラルネットワーク

色々作りました。コードは全部乗せ(表のthree_layer_dropout_bn_relu_resnet)だけ記載します。
精度比較は下記。
bn(batch normalization)を入れると精度が10%ポイント近く上がるのは違和感ありますね。

方法 train valid
one_layer 76.90 78.71
two_layer 77.32 79.46
three_layer 77.62 77.62
three_layer_dropout 77.26 79.99
three_layer_dropout_bn 89.29 89.29
three_layer_dropout_bn_relu 83.34 84.25
three_layer_dropout_bn_relu_resnet 84.21 84.21
# kaggle.com/qiaoshiji/resnet-deep

# 79. 多層ニューラルネットワーク
# 問題78のコードを改変し,バイアス項の導入や多層化など,ニューラルネットワークの形状を変更しながら,高性能なカテゴリ分類器を構築せよ.

from typing import Union

import matplotlib.pyplot as plt
import numpy as np
import torch
from torch import nn
from torch.utils.data import DataLoader, Dataset
from tqdm import tqdm


class TextDataset(Dataset):
    def __init__(self, X, y):
        self.X = X
        self.y = y

    def __len__(self):
        return len(self.X)

    def __getitem__(self, idx):
        return self.X[idx], self.y[idx]


class Net(nn.Module):
    def __init__(self, in_shape: int, out_shape: int):
        super().__init__()
        self.fc1 = nn.Linear(300, 150, bias=True)
        self.dropout1 = nn.Dropout(0.25)
        self.bn1 = nn.BatchNorm1d(150)
        self.fc2 = nn.Linear(150, 150, bias=True)
        self.dropout2 = nn.Dropout(0.25)
        self.bn2 = nn.BatchNorm1d(150)
        self.fc3 = nn.Linear(300, 4, bias=True)
        self.softmax = nn.Softmax(dim=1)
        self.relu = nn.ReLU()

    def forward(self, x):
        x1 = self.fc1(x)
        x1 = self.dropout1(x1)
        x1 = self.bn1(x1)
        x1 = self.relu(x1)

        x2 = self.fc2(x1)
        x2 = self.dropout2(x2)
        x2 = self.bn2(x2)
        x2 = self.relu(x2)
        x2 = torch.cat([x1, x2], dim=1)

        x3 = self.fc3(x2)
        x3 = self.softmax(x3)

        return x3


def train_fn(model, loader, optimizer, loss) -> Union[float, float]:
    """model, loaderを用いて学習を行い、lossを返す"""
    train_running_loss = 0.0
    valid_running_loss = 0.0

    for dataloader_x, dataloader_y in loader:
        optimizer.zero_grad()

        dataloader_y_pred_prob = model(dataloader_x)

        # dataset_xでの損失の計算
        dataloader_loss = loss(dataloader_y_pred_prob, dataloader_y)
        dataloader_loss.backward()

        # 訓練データ、検証データでの損失の平均を計算する
        train_running_loss += dataloader_loss.item() / len(loader)
        valid_running_loss += loss(model(valid_x), valid_y).item() / len(loader)

        optimizer.step()

    return train_running_loss, valid_running_loss


def calc_acc(model, train_x, y_true) -> float:
    """modelと学習データ、正解データを用いて、正解率を計算する"""
    # 最も正解率の高い予測確率を正解ラベルとする。
    _, y_pred = torch.max(model(train_x), 1)

    # 学習データに対する正解率の計算
    correct_num = (y_pred == y_true).sum().item()
    total_size = y_true.size(0)
    acc = (correct_num / total_size) * 100
    return acc


def make_graph(value_dict: dict, value_name: str, method: str) -> None:
    """value_dictに関するgraphを生成し、保存する。"""
    for phase in ["train", "valid"]:
        plt.plot(value_dict[phase], label=phase)
    plt.xlabel("epoch")
    plt.ylabel(value_name)
    plt.title(f"{value_name} per epoch")
    plt.legend()
    plt.savefig(f"{method}_{value_name}.png")
    plt.close()


if __name__ == "__main__":

    METHOD = "three_layer_linear_dropout_bn"

    if not torch.cuda.is_available():
        print("No cuda")

    PATH = ".."

    device = (
        torch.device("cuda:0") if torch.cuda.is_available() else torch.device("cpu")
    )

    # 学習データの読み込み
    train_x = torch.tensor(
        np.load(f"{PATH}/70/train_vector.npy"), requires_grad=True
    ).to(device)
    train_y = torch.tensor(np.load(f"{PATH}/70/train_label.npy")).to(device)

    # 評価データの読み込み
    valid_x = torch.tensor(
        np.load(f"{PATH}/70/valid_vector.npy"), requires_grad=True
    ).to(device)
    valid_y = torch.tensor(np.load(f"{PATH}/70/valid_label.npy")).to(device)

    # modelの設定
    model = Net(in_shape=train_x.shape[1], out_shape=4).to(device)

    # loss, optimizerの設定
    loss = nn.CrossEntropyLoss()
    optimizer = torch.optim.SGD(model.parameters(), lr=0.01)

    # DataLoaderの構築
    dataset = TextDataset(train_x, train_y)

    # parameterの更新
    BATCHSIZE = 32
    loader = DataLoader(dataset, batch_size=BATCHSIZE, shuffle=True)

    train_losses = []
    train_accs = []
    valid_losses = []
    valid_accs = []

    EPOCH = 10
    for epoch in tqdm(range(EPOCH)):

        # 学習
        train_running_loss, valid_running_loss = train_fn(
            model, loader, optimizer, loss
        )

        # 訓練データでの損失の保存
        train_losses.append(train_running_loss)

        # 訓練データでの正解率の計算
        train_acc = calc_acc(model, train_x, train_y)
        # 訓練データでの正解率の保存
        train_accs.append(train_acc)

        # 検証データでの損失の保存
        valid_losses.append(valid_running_loss)

        # 検証データでの正解率の計算
        valid_acc = calc_acc(model, valid_x, valid_y)
        # 検証データでの正解率の保存
        valid_accs.append(valid_acc)

        # 20epoch毎にチェックポイントを生成
        if epoch % 20 == 0:
            torch.save(model.state_dict(), f"79_model_bs_epoch{epoch}.pth")
            torch.save(
                optimizer.state_dict(),
                f"79_optimizer_epoch{epoch}.pth",
            )

    # グラフへのプロット
    losses = {"train": train_losses, "valid": valid_losses}

    accs = {"train": train_accs, "valid": valid_accs}

    make_graph(losses, "losses", METHOD)
    make_graph(accs, "accs", METHOD)

    print(f"train_acc: {train_acc}")
    print(f"valid_acc: {valid_acc}")
    # train_acc: 84.21101949025487
    # valid_acc: 85.83208395802099

感想

NeuralNetworkは完成済みモデルを少しいじるくらいでそれぞれのコードが何をしているかあまり理解していなかったのですが、本章でどの層が何をしているのかの理解がかなり進みました。