言語処理100本ノック 2020 第9章を解きました。

言語処理100本ノック 2020の第9章を解いたので、その解法と思ったことをつらつら書きます。 学んだことを記録しているため、コード中にコメントが大量にあるのはご容赦ください。

コードはこちら

80. ID番号への変換

愚直にやっています。torchtextを使えばもっと簡単にできそう

# 80. ID番号への変換
# 問題51で構築した学習データ中の単語にユニークなID番号を付与したい.
# 学習データ中で最も頻出する単語に1,2番目に頻出する単語に2,……といった方法で,学習データ中で2回以上出現する単語にID番号を付与せよ.
# そして,与えられた単語列に対して,ID番号の列を返す関数を実装せよ.ただし,出現頻度が2回未満の単語のID番号はすべて0とせよ

import collections
import pickle
import sys

import pandas as pd

sys.path.append("../../src/NLP100")

from util import preprocess


def change_word_to_id(input_word: str, word_id_dict: dict) -> str:
    # ID番号へ変換、辞書に存在しないものは0をいれる
    result_list = []
    for word in input_word.split():
        if word in word_id_dict:
            result_list.append(str(word_id_dict[word]))
        else:
            result_list.append("0")

    return " ".join(result_list)


if __name__ == "__main__":

    # データの読み込み
    train = pd.read_csv("../../ch06/50/train.txt", sep="\t", index_col=0)
    test = pd.read_csv("../../ch06/50/test.txt", sep="\t", index_col=0)

    # trainとtestを結合する
    train["flg"] = "train"
    test["flg"] = "test"
    train_test = pd.concat([train, test])

    # 前処理
    train_test["TITLE"] = train_test[["TITLE"]].apply(preprocess)

    # 全文章を一つにまとめたstrを生成
    all_sentence_list = " ".join(train_test["TITLE"].tolist()).split(" ")

    # 全文章に含まれる単語の頻度を計算
    all_word_cnt = collections.Counter(all_sentence_list)

    # 出現頻度が2回以上の単語のみを取得
    word_cnt_over2 = [i for i in all_word_cnt.items() if i[1] >= 2]
    word_cnt_over2 = sorted(word_cnt_over2, key=lambda x: x[1], reverse=True)

    # 単語のみ取得
    word_over2 = [i[0] for i in word_cnt_over2]
    # ID番号を取得
    id_list = [i for i in range(1, len(word_over2))]

    # 単語とID番号をdictへとまとめる
    word_id_dict = dict(zip(word_over2, id_list))

    # 出力
    with open("word_id_dict.pkl", "wb") as tf:
        pickle.dump(word_id_dict, tf)

    # train_testのTITLEをID番号へと変換
    train_test["TITLE"] = train_test["TITLE"].apply(
        lambda x: change_word_to_id(x, word_id_dict)
    )

    # train, testへ分離
    train = train_test.query('flg == "train"')
    test = train_test.query('flg == "test"')

    # 出力
    train.to_pickle("train_title_id.pkl")
    train.to_csv("train_title_id.csv")

    test.to_pickle("test_title_id.pkl")
    test.to_csv("test_title_id.csv")

81. RNNによる予測

数式をコードとして反映するのにちょっと骨が折れました。

x = x[:, -1, :] # 一番最後の出力に絞る、やっていいのかこれ?というコメントありますが、結論としてやる必要があると考えています。 nn.RNNはsequentialに各単語の出力を吐き出し、それを全て保存しています。 今回必要なのは、一番最後の単語ベクトルの出力のため、x[:, -1, :]で取り出しています。

詳しくは公式のoutputを参照 (間違っていたら教えてください🙇‍♂️)

# 81. RNNによる予測
# https://pytorch.org/tutorials/intermediate/char_rnn_classification_tutorial.html

import pickle

import pandas as pd
import torch
import torch.nn as nn
from torch.utils.data import Dataset


class TextDataset(Dataset):
    def __init__(self, X, y):
        self.X = X
        self.y = y

    def __len__(self):  # len(Dataset)で返す値を指定
        return len(self.X)

    def __getitem__(self, idx):  # Dataset[index]で返す値を指定
        # Xをintのlistに変換
        X_list = [int(x) for x in self.X[idx].split()]

        # tensorに変換
        inputs = torch.tensor(
            X_list
        )  # .unsqueeze(0) # unsqueezeはtorch.Size([6]) → torch.Size([1, 6])
        label = torch.tensor(self.y[idx])

        return inputs, label


class RNN(nn.Module):
    def __init__(self, vocab_size, emb_dim=300, hidden_size=50, output_size=4):
        super().__init__()

        self.emb = nn.Embedding(
            vocab_size, emb_dim, padding_idx=0  # 0に変換された文字にベクトルを計算しない
        )

        self.rnn = nn.RNN(
            input_size=emb_dim,
            hidden_size=hidden_size,
            num_layers=1,
            nonlinearity="tanh",
            bias=True,
        )

        self.fc = nn.Linear(
            in_features=hidden_size, out_features=output_size, bias=True
        )

        self.softmax = nn.Softmax(dim=1)

    def forward(self, x, h_0=None):
        x = self.emb(x)
        x, h_t = self.rnn(x, h_0)
        x = x[:, -1, :]  # 一番最後の出力に絞る、やっていいのかこれ?
        x = self.fc(x)
        x = self.softmax(x)
        return x


if __name__ == "__main__":

    # データの読み込み
    train = pd.read_pickle("../80/train_title_id.pkl")

    # yの変換
    cat_id_dict = {"b": 0, "t": 1, "e": 2, "m": 3}
    train["CATEGORY"] = train["CATEGORY"].map(cat_id_dict)

    # 辞書の読み込み
    with open("../80/word_id_dict.pkl", "rb") as tf:
        word_id_dict = pickle.load(tf)

    n_letters = len(word_id_dict.keys())
    n_hidden = 50
    n_categories = 4

    # modelの定義
    model = RNN(n_letters, n_hidden, n_categories)

    # datasetの定義
    dataset = TextDataset(train["TITLE"], train["CATEGORY"])

    # 先頭10個の結果を出力
    for i in range(10):
        X = dataset[i][0]
        X = X.unsqueeze(0)
        print(model(x=X))

    # tensor([[0.3996, 0.1701, 0.1483, 0.2821]], grad_fn=<SoftmaxBackward>)
    # tensor([[0.2412, 0.2179, 0.1354, 0.4055]], grad_fn=<SoftmaxBackward>)
    # tensor([[0.1941, 0.3746, 0.0832, 0.3481]], grad_fn=<SoftmaxBackward>)
    # tensor([[0.1472, 0.3155, 0.1035, 0.4338]], grad_fn=<SoftmaxBackward>)
    # tensor([[0.2272, 0.3158, 0.1757, 0.2812]], grad_fn=<SoftmaxBackward>)
    # tensor([[0.2272, 0.3158, 0.1757, 0.2812]], grad_fn=<SoftmaxBackward>)
    # tensor([[0.3958, 0.1784, 0.1419, 0.2839]], grad_fn=<SoftmaxBackward>)
    # tensor([[0.3958, 0.1784, 0.1419, 0.2839]], grad_fn=<SoftmaxBackward>)
    # tensor([[0.1054, 0.5542, 0.0961, 0.2442]], grad_fn=<SoftmaxBackward>)
    # tensor([[0.2882, 0.3606, 0.1956, 0.1556]], grad_fn=<SoftmaxBackward>)

82. 確率的勾配降下法による学習

SGDを用いて学習し、損失と正解率を表示するコードを追加しました。 ミニバッチ化は次の問題のため、BATCHSIZE=1を指定しています。

# 82. 確率的勾配降下法による学習
# 確率的勾配降下法(SGD: Stochastic Gradient Descent)を用いて,問題81で構築したモデルを学習せよ.
# 訓練データ上の損失と正解率,評価データ上の損失と正解率を表示しながらモデルを学習し,
# 適当な基準(例えば10エポックなど)で終了させよ.

import os
import pickle
import random
import time

import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import torch
import torch.nn as nn
from torch.utils.data import DataLoader, Dataset
from tqdm import tqdm


def seed_everything(seed=42, use_torch=False):
    random.seed(seed)
    os.environ["PYTHONHASHSEED"] = str(seed)
    np.random.seed(seed)
    if use_torch:
        torch.manual_seed(seed)
        torch.cuda.manual_seed(seed)
        torch.backends.cudnn.deterministic = True


seed_everything(use_torch=True)


class TextDataset(Dataset):
    def __init__(self, X, y):
        self.X = X
        self.y = y

    def __len__(self):  # len(Dataset)で返す値を指定
        return len(self.X)

    def __getitem__(self, idx):  # Dataset[index]で返す値を指定
        # Xをintのlistに変換
        X_list = [int(x) for x in self.X[idx].split()]

        # tensorに変換
        inputs = torch.tensor(X_list)
        label = torch.tensor(self.y[idx])

        return inputs, label


class RNN(nn.Module):
    def __init__(self, vocab_size, emb_dim, hidden_size, output_size):
        super().__init__()

        self.emb = nn.Embedding(
            vocab_size, emb_dim, padding_idx=0  # 0に変換された文字にベクトルを計算しない
        )

        self.rnn = nn.RNN(
            input_size=emb_dim,
            hidden_size=hidden_size,
            num_layers=1,
            nonlinearity="tanh",
            bias=True,
            batch_first=True,
        )
        # batch_first=Trueにすると、inputとoutputの型が変わる

        # inputの想定の型を、(L, N, H_in)から(N, L, H_in)に変更する
        # L: Sequence Length
        # N: Batch Size
        # H_in: input size

        # outputの想定の型を、(L, N, D * H_out)から(N, L, D * H_out)に変更する
        # L: Sequence Length
        # N: Batch Size
        # D: 2 if bidirectional=True otherwise 1
        # H_out: hidden size

        self.fc = nn.Linear(
            in_features=hidden_size, out_features=output_size, bias=True
        )

        self.softmax = nn.Softmax(dim=1)

    def forward(self, x, h_0=None):
        x = self.emb(x)
        x, h_t = self.rnn(x, h_0)
        x = x[:, -1, :]  # sequenceの長さ(現在は10)のうち、一番最後の出力に絞る、やっていいのかこれ?
        x = self.fc(x)
        x = self.softmax(x)
        return x


def train_fn(model, loader, optimizer, criterion, BATCHSIZE, HIDDEN_SIZE) -> float:
    """model, loaderを用いて学習を行い、lossを返す"""

    # 学習モードに設定
    model.train()

    train_running_loss = 0.0

    for dataloader_x, dataloader_y in loader:
        optimizer.zero_grad()

        dataloader_y_pred_prob = model(
            x=dataloader_x
            # x=dataloader_x,
            # h_0=torch.zeros(1 * 1, BATCHSIZE, HIDDEN_SIZE)
        )

        # dataloader_xでの損失の計算
        loss = criterion(dataloader_y_pred_prob, dataloader_y)
        # 勾配の計算
        loss.backward()
        optimizer.step()

        # 訓練データでの損失の平均を計算する
        train_running_loss += loss.item() / len(loader)

    return train_running_loss


def calculate_loss_and_accuracy(model, dataset, device=None, criterion=None):
    """損失・正解率を計算"""
    # 評価モードに設定
    model.eval()

    dataloader = DataLoader(dataset, batch_size=1, shuffle=False)

    loss = 0.0
    total = 0
    correct = 0

    with torch.no_grad():
        for dataloader_x, dataloader_y in dataloader:
            # 順伝播
            outputs = model(dataloader_x)

            # 損失計算
            loss += criterion(outputs, dataloader_y).item()

            # 正解率計算
            pred = torch.argmax(outputs, dim=-1)
            total += len(dataloader_x)
            correct += (pred == dataloader_y).sum().item()

    return loss / len(dataset), correct / total


def padding(id_seq: str, max_len: int):
    """id_seqについて、
    max_lenより長い場合はmax_lenまでの長さにする。
    max_lenより短い場合はmax_lenになるように0を追加する。
    """
    id_list = id_seq.split(" ")
    if len(id_list) > max_len:
        id_list = id_list[:max_len]
    else:
        pad_num = max_len - len(id_list)
        for _ in range(pad_num):
            id_list.append("0")
    return " ".join(id_list)


def make_graph(value_dict: dict, value_name: str, bn: int, method: str) -> None:
    """value_dictに関するgraphを生成し、保存する。"""
    for phase in ["train", "test"]:
        plt.plot(value_dict[phase], label=phase)
    plt.xlabel("epoch")
    plt.ylabel(value_name)
    plt.title(f"{value_name} per epoch at bn{bn}")
    plt.legend()
    plt.savefig(f"{method}_{value_name}_bn{bn}.png")
    plt.close()


if __name__ == "__main__":

    # 時間の計測
    start = time.time()

    # データの読み込み
    train = pd.read_pickle("../80/train_title_id.pkl")
    test = pd.read_pickle("../80/test_title_id.pkl")
    test = test.reset_index(drop=True)

    # paddingの実施
    max_len = 10
    train["TITLE"] = train["TITLE"].apply(lambda x: padding(x, max_len))
    test["TITLE"] = test["TITLE"].apply(lambda x: padding(x, max_len))

    # yの変換
    cat_id_dict = {"b": 0, "t": 1, "e": 2, "m": 3}
    train["CATEGORY"] = train["CATEGORY"].map(cat_id_dict)
    test["CATEGORY"] = test["CATEGORY"].map(cat_id_dict)

    # 辞書の読み込み
    with open("../80/word_id_dict.pkl", "rb") as tf:
        word_id_dict = pickle.load(tf)

    N_LETTERS = len(word_id_dict.keys()) + 1  # pad分をplusする。
    EMB_SIZE = 300
    HIDDEN_SIZE = 50
    N_CATEGORIES = 4

    # modelの定義
    model = RNN(
        vocab_size=N_LETTERS,
        emb_dim=EMB_SIZE,
        hidden_size=HIDDEN_SIZE,
        output_size=N_CATEGORIES,
    )

    # criterion, optimizerの設定
    criterion = nn.CrossEntropyLoss()
    optimizer = torch.optim.SGD(model.parameters(), lr=0.05)

    # datasetの定義
    dataset_train = TextDataset(train["TITLE"], train["CATEGORY"])
    dataset_test = TextDataset(test["TITLE"], test["CATEGORY"])

    # parameterの更新
    BATCHSIZE = 1
    dataloader_train = DataLoader(
        dataset_train, batch_size=BATCHSIZE, shuffle=False, drop_last=True
    )

    train_losses = []
    train_accs = []
    test_losses = []
    test_accs = []

    # deviceの指定
    device = (
        torch.device("cuda:0") if torch.cuda.is_available() else torch.device("cpu")
    )

    EPOCH = 10
    for epoch in tqdm(range(EPOCH)):

        # 学習
        train_running_loss = train_fn(
            model, dataloader_train, optimizer, criterion, BATCHSIZE, HIDDEN_SIZE
        )
        print(train_running_loss)

        # 損失と正解率の算出
        train_loss, train_acc = calculate_loss_and_accuracy(
            model, dataset_train, device, criterion
        )
        test_loss, test_acc = calculate_loss_and_accuracy(
            model, dataset_test, device, criterion
        )

        train_losses.append(train_loss)
        train_accs.append(train_acc)

        test_losses.append(test_loss)
        test_accs.append(test_acc)

        # 20epoch毎にチェックポイントを生成
        if epoch % 20 == 0:
            torch.save(model.state_dict(), f"82_model_epoch{epoch}.pth")
            torch.save(
                optimizer.state_dict(),
                f"82_optimizer_epoch{epoch}.pth",
            )

    # グラフへのプロット
    losses = {"train": train_losses, "test": test_losses}

    accs = {"train": train_accs, "test": test_accs}

    make_graph(losses, "losses", bn=BATCHSIZE, method="rnn")
    make_graph(accs, "accs", bn=BATCHSIZE, method="rnn")

    print(f"train_acc: {train_acc}")
    print(f"test_acc: {test_acc}")

    # 時間の出力
    elapsed_time = time.time() - start
    print(elapsed_time)

学習結果はこんな感じ f:id:sinchir0:20210909075933p:plain

時間:390.43s

83. ミニバッチ化・GPU上での学習

BATCHSIZE=32を指定し、ミニバッチ化を行なっています。

# 83. ミニバッチ化・GPU上での学習
# 問題82のコードを改変し,B事例ごとに損失・勾配を計算して学習を行えるようにせよ(Bの値は適当に選べ).また,GPU上で学習を実行せよ.
# GPUで計算しているColabのコード
# https://colab.research.google.com/drive/1IAzvlHQ19RSkqyVk4UosinRUgXnu2Q59?usp=sharing
# dataset, h_0, modelをdevice送りした

import os
import pickle
import random

import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import torch
import torch.nn as nn
from torch.utils.data import DataLoader, Dataset
from tqdm import tqdm


def seed_everything(seed=42, use_torch=False):
    random.seed(seed)
    os.environ["PYTHONHASHSEED"] = str(seed)
    np.random.seed(seed)
    if use_torch:
        torch.manual_seed(seed)
        torch.cuda.manual_seed(seed)
        torch.backends.cudnn.deterministic = True


seed_everything(use_torch=True)


class TextDataset(Dataset):
    def __init__(self, X, y):
        self.X = X
        self.y = y

    def __len__(self):  # len(Dataset)で返す値を指定
        return len(self.X)

    def __getitem__(self, idx):  # Dataset[index]で返す値を指定
        # Xをintのlistに変換
        X_list = [int(x) for x in self.X[idx].split()]

        # tensorに変換
        inputs = torch.tensor(X_list)
        label = torch.tensor(self.y[idx])

        return inputs, label


class RNN(nn.Module):
    def __init__(self, vocab_size, emb_dim, hidden_size, output_size):
        super().__init__()

        self.emb = nn.Embedding(
            vocab_size, emb_dim, padding_idx=0  # 0に変換された文字にベクトルを計算しない
        )

        self.rnn = nn.RNN(
            input_size=emb_dim,
            hidden_size=hidden_size,
            num_layers=1,
            nonlinearity="tanh",
            bias=True,
            batch_first=True,
        )
        # batch_first=Trueにすると、inputとoutputの型が変わる

        # inputの想定の型を、(L, N, H_in)から(N, L, H_in)に変更する
        # L: Sequence Length
        # N: Batch Size
        # H_in: input size

        # outputの想定の型を、(L, N, D * H_out)から(N, L, D * H_out)に変更する
        # L: Sequence Length
        # N: Batch Size
        # D: 2 if bidirectional=True otherwise 1
        # H_out: hidden size

        self.fc = nn.Linear(
            in_features=hidden_size, out_features=output_size, bias=True
        )

        self.softmax = nn.Softmax(dim=1)

    def forward(self, x, h_0=None):
        x = self.emb(x)
        # 普通のとき、torch.Size([32, 10, 300])
        # 落ちるとき, torch.Size([16, 10, 300])
        # train.shape[0] == 10672
        # 10672を32で割ると、333 余り16
        # よって余りをどう扱うのかの問題になる
        # loaderの引数にdrop_last=Trueを追加すると、余りは捨てるようになる。
        x, h_t = self.rnn(x, h_0)
        x = x[:, -1, :]  # sequenceの長さ(現在は10)のうち、一番最後の出力に絞る、やっていいのかこれ?
        x = self.fc(x)
        x = self.softmax(x)
        return x


def train_fn(
    model, loader, device, optimizer, criterion, BATCHSIZE, HIDDEN_SIZE
) -> float:
    """model, loaderを用いて学習を行い、lossを返す"""
    # 学習モードに設定
    model.train()

    train_running_loss = 0.0

    for dataloader_x, dataloader_y in loader:
        dataloader_x.to(device)
        dataloader_y.to(device)

        optimizer.zero_grad()

        dataloader_y_pred_prob = model(
            x=dataloader_x, h_0=torch.zeros(1 * 1, BATCHSIZE, HIDDEN_SIZE)
        )

        # dataloader_xでの損失の計算
        loss = criterion(dataloader_y_pred_prob, dataloader_y)
        # 勾配の計算
        loss.backward()
        optimizer.step()

        # 訓練データでの損失の平均を計算する
        train_running_loss += loss.item() / len(loader)

    return train_running_loss


def calculate_loss_and_accuracy(model, dataset, device=None, criterion=None):
    """損失・正解率を計算"""
    # 評価モードに設定
    model.eval()

    dataloader = DataLoader(dataset, batch_size=1, shuffle=False)

    loss = 0.0
    total = 0
    correct = 0

    with torch.no_grad():
        for dataloader_x, dataloader_y in dataloader:
            # 順伝播
            outputs = model(dataloader_x)

            # 損失計算
            loss += criterion(outputs, dataloader_y).item()

            # 正解率計算
            pred = torch.argmax(outputs, dim=-1)
            total += len(dataloader_x)
            correct += (pred == dataloader_y).sum().item()

    return loss / len(dataset), correct / total


def padding(id_seq: str, max_len: int):
    """id_seqについて、
    max_lenより長い場合はmax_lenまでの長さにする。
    max_lenより短い場合はmax_lenになるように0を追加する。
    """
    id_list = id_seq.split(" ")
    if len(id_list) > max_len:
        id_list = id_list[:max_len]
    else:
        pad_num = max_len - len(id_list)
        for _ in range(pad_num):
            id_list.append("0")
    return " ".join(id_list)


def make_graph(value_dict: dict, value_name: str, method: str) -> None:
    """value_dictに関するgraphを生成し、保存する。"""
    for phase in ["train", "test"]:
        plt.plot(value_dict[phase], label=phase)
    plt.xlabel("epoch")
    plt.ylabel(value_name)
    plt.title(f"{value_name} per epoch")
    plt.legend()
    plt.savefig(f"{method}_{value_name}.png")
    plt.close()


if __name__ == "__main__":

    # データの読み込み
    train = pd.read_pickle("../80/train_title_id.pkl")
    test = pd.read_pickle("../80/test_title_id.pkl")
    test = test.reset_index(drop=True)

    # paddingの実施
    max_len = 10
    train["TITLE"] = train["TITLE"].apply(lambda x: padding(x, max_len))
    test["TITLE"] = test["TITLE"].apply(lambda x: padding(x, max_len))

    # yの変換
    cat_id_dict = {"b": 0, "t": 1, "e": 2, "m": 3}
    train["CATEGORY"] = train["CATEGORY"].map(cat_id_dict)
    test["CATEGORY"] = test["CATEGORY"].map(cat_id_dict)

    # 辞書の読み込み
    with open("../80/word_id_dict.pkl", "rb") as tf:
        word_id_dict = pickle.load(tf)

    N_LETTERS = len(word_id_dict.keys()) + 1  # pad分をplusする。
    EMB_SIZE = 300
    HIDDEN_SIZE = 50
    N_CATEGORIES = 4

    # deviceの指定
    device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

    # modelの定義
    model = RNN(
        vocab_size=N_LETTERS,
        emb_dim=EMB_SIZE,
        hidden_size=HIDDEN_SIZE,
        output_size=N_CATEGORIES,
    ).to(device)

    # criterion, optimizerの設定
    criterion = nn.CrossEntropyLoss()
    optimizer = torch.optim.SGD(model.parameters(), lr=0.05)

    # datasetの定義
    dataset_train = TextDataset(train["TITLE"], train["CATEGORY"])
    dataset_test = TextDataset(test["TITLE"], test["CATEGORY"])

    # parameterの更新
    BATCHSIZE = 32
    dataloader_train = DataLoader(
        dataset_train, batch_size=BATCHSIZE, shuffle=True, drop_last=True
    )

    train_losses = []
    train_accs = []
    test_losses = []
    test_accs = []

    EPOCH = 10
    for epoch in tqdm(range(EPOCH)):

        # 学習
        train_running_loss = train_fn(
            model,
            dataloader_train,
            device,
            optimizer,
            criterion,
            BATCHSIZE,
            HIDDEN_SIZE,
        )
        print(train_running_loss)

        # 損失と正解率の算出
        train_loss, train_acc = calculate_loss_and_accuracy(
            model, dataset_train, device, criterion
        )
        test_loss, test_acc = calculate_loss_and_accuracy(
            model, dataset_test, device, criterion
        )

        train_losses.append(train_loss)
        train_accs.append(train_acc)

        test_losses.append(test_loss)
        test_accs.append(test_acc)

        # 20epoch毎にチェックポイントを生成
        if epoch % 20 == 0:
            torch.save(model.state_dict(), f"83_model_epoch{epoch}.pth")
            torch.save(
                optimizer.state_dict(),
                f"83_optimizer_epoch{epoch}.pth",
            )

    # グラフへのプロット
    losses = {"train": train_losses, "test": test_losses}

    accs = {"train": train_accs, "test": test_accs}

    make_graph(losses, "losses", method="rnn")
    make_graph(accs, "accs", method="rnn")

    print(f"train_acc: {train_acc}")
    print(f"test_acc: {test_acc}")

グラフはこんな感じ f:id:sinchir0:20210909081556p:plain

時間: 35.65s

batchsizeが1の時に必要な時間時間:390.43sと比べると、短時間で学習が終了しているのが分かります。

84. 単語ベクトルの導入

これはちょっと注意が必要だなと感じた問題です。

nn.embedding.from_pretrainで単語埋め込みを初期化するのですが、 その際、今回使用している語彙と初期化に使用する語彙(今回はGoogle Newsデータセット)のidxを合わせる必要があります。

これをやらないと、例えばgoogle newsのidx1番目の語彙がdogで、今回のTEXTの語彙の1番目がupdateだった場合、dogの分散表現でupdateのweightを初期化してしまい、意味のない初期化となります。注意。 コードとしては下記部分でidxを合わせています。

for word, idx in word_id_dict.items():
    if word in model.vocab.keys():
        weight[idx] = torch.tensor(model[word])

以下、全体コード。

# 84. 単語ベクトルの導入
# 事前学習済みの単語ベクトル(例えば,Google Newsデータセット(約1,000億単語)での学習済み単語ベクトル)で単語埋め込みemb(x)を初期化し,学習せよ.

import os
import pickle
import random

import gensim
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import torch
import torch.nn as nn
from numpy.lib.function_base import kaiser
from torch.utils.data import DataLoader, Dataset
from tqdm import tqdm


def seed_everything(seed=42, use_torch=False):
    random.seed(seed)
    os.environ["PYTHONHASHSEED"] = str(seed)
    np.random.seed(seed)
    if use_torch:
        torch.manual_seed(seed)
        torch.cuda.manual_seed(seed)
        torch.backends.cudnn.deterministic = True


seed_everything(use_torch=True)


class TextDataset(Dataset):
    def __init__(self, X, y, device):
        self.X = X
        self.y = y
        self.device = device

    def __len__(self):  # len(Dataset)で返す値を指定
        return len(self.X)

    def __getitem__(self, idx):  # Dataset[index]で返す値を指定
        # Xをintのlistに変換
        X_list = [int(x) for x in self.X[idx].split()]

        # tensorに変換
        inputs = torch.tensor(X_list, device=self.device)
        label = torch.tensor(self.y[idx], device=self.device)

        return inputs, label


class RNN(nn.Module):
    def __init__(self, vocab_size, emb_dim, hidden_size, output_size, word_id_dict):
        super().__init__()

        # embedding層の初期化
        model = gensim.models.KeyedVectors.load_word2vec_format(
            "../../ch07/60/GoogleNews-vectors-negative300.bin", binary=True
        )
        weight = torch.zeros(len(word_id_dict) + 1, 300)  # 1はPADの分
        for word, idx in word_id_dict.items():
            if word in model.vocab.keys():
                weight[idx] = torch.tensor(model[word])

        # random
        # weight = torch.rand(len(word_id_dict)+1, 300) #ランダムな単語ベクトルでも効果があるか

        # pretrainする場合のemb
        # vocab_size, emb_dimは、GoogleNews-vectors-negative300が3000000語彙で300次元だから指定しなくて良い
        self.emb = nn.Embedding.from_pretrained(
            weight, padding_idx=0  # 0に変換された文字にベクトルを計算しない
        )

        # Use False Idx
        # model = gensim.models.KeyedVectors.load_word2vec_format('../../ch07/60/GoogleNews-vectors-negative300.bin', binary=True)
        # weights = torch.FloatTensor(model.vectors)
        # self.emb = nn.Embedding.from_pretrained(weights)

        # default
        # self.emb = nn.Embedding(
        #     vocab_size,
        #     emb_dim,
        #     padding_idx=0 # 0に変換された文字にベクトルを計算しない
        #     )

        self.rnn = nn.RNN(
            input_size=emb_dim,
            hidden_size=hidden_size,
            num_layers=1,
            nonlinearity="tanh",
            bias=True,
            batch_first=True,
        )
        # batch_first=Trueにすると、inputとoutputの型が変わる

        # inputの想定の型を、(L, N, H_in)から(N, L, H_in)に変更する
        # L: Sequence Length
        # N: Batch Size
        # H_in: input size

        # outputの想定の型を、(L, N, D * H_out)から(N, L, D * H_out)に変更する
        # L: Sequence Length
        # N: Batch Size
        # D: 2 if bidirectional=True otherwise 1
        # H_out: hidden size

        self.fc = nn.Linear(
            in_features=hidden_size, out_features=output_size, bias=True
        )

        self.softmax = nn.Softmax(dim=1)

    def forward(self, x, h_0=None):
        x = self.emb(x)
        # 普通のとき、torch.Size([32, 10, 300])
        # 落ちるとき, torch.Size([16, 10, 300])
        # train.shape[0] == 10672
        # 10672を32で割ると、333 余り16
        # よって余りをどう扱うのかの問題になる
        # loaderの引数にdrop_last=Trueを追加すると、余りは捨てるようになる。
        x, h_t = self.rnn(x, h_0)
        x = x[:, -1, :]  # sequenceの長さ(現在は10)のうち、一番最後の出力に絞る、やっていいのかこれ?
        x = self.fc(x)
        x = self.softmax(x)
        return x


def train_fn(
    model, loader, device, optimizer, criterion, BATCHSIZE, HIDDEN_SIZE
) -> float:
    """model, loaderを用いて学習を行い、lossを返す"""
    # 学習モードに設定
    model.train()

    train_running_loss = 0.0

    for dataloader_x, dataloader_y in loader:
        dataloader_x.to(device)
        dataloader_y.to(device)

        optimizer.zero_grad()

        dataloader_y_pred_prob = model(
            x=dataloader_x,
            h_0=torch.zeros(1 * 1, BATCHSIZE, HIDDEN_SIZE),  # .to(device)
        )

        # dataloader_xでの損失の計算/
        loss = criterion(dataloader_y_pred_prob, dataloader_y)
        # 勾配の計算
        loss.backward()
        optimizer.step()

        # 訓練データでの損失の平均を計算する
        train_running_loss += loss.item() / len(loader)

    return train_running_loss


def calculate_loss_and_accuracy(model, dataset, device=None, criterion=None):
    """損失・正解率を計算"""
    # 評価モードに設定
    model.eval()

    dataloader = DataLoader(dataset, batch_size=1, shuffle=False)

    loss = 0.0
    total = 0
    correct = 0

    with torch.no_grad():
        for dataloader_x, dataloader_y in dataloader:
            # 順伝播
            outputs = model(dataloader_x)

            # 損失計算
            loss += criterion(outputs, dataloader_y).item()

            # 正解率計算
            pred = torch.argmax(outputs, dim=-1)
            total += len(dataloader_x)
            correct += (pred == dataloader_y).sum().item()

    return loss / len(dataset), correct / total


def padding(id_seq: str, max_len: int):
    """id_seqについて、
    max_lenより長い場合はmax_lenまでの長さにする。
    max_lenより短い場合はmax_lenになるように0を追加する。
    """
    id_list = id_seq.split(" ")
    if len(id_list) > max_len:
        id_list = id_list[:max_len]
    else:
        pad_num = max_len - len(id_list)
        for _ in range(pad_num):
            id_list.append("0")
    return " ".join(id_list)


def make_graph(value_dict: dict, value_name: str, bn: int, method: str) -> None:
    """value_dictに関するgraphを生成し、保存する。"""
    for phase in ["train", "test"]:
        plt.plot(value_dict[phase], label=phase)
    plt.xlabel("epoch")
    plt.ylabel(value_name)
    plt.title(f"{value_name} per epoch at bn{bn}")
    plt.legend()
    plt.savefig(f"{method}_{value_name}_bn{bn}.png")
    plt.close()


if __name__ == "__main__":

    # Colab
    # PATH = '/content/drive/MyDrive/NLP100/ch09'
    # local
    PATH = ".."

    # データの読み込み
    train = pd.read_csv(f"{PATH}/80/train_title_id.csv")
    test = pd.read_csv(f"{PATH}/80/test_title_id.csv")
    test = test.reset_index(drop=True)

    # paddingの実施
    max_len = 10
    train["TITLE"] = train["TITLE"].apply(lambda x: padding(x, max_len))
    test["TITLE"] = test["TITLE"].apply(lambda x: padding(x, max_len))

    # yの変換
    cat_id_dict = {"b": 0, "t": 1, "e": 2, "m": 3}
    train["CATEGORY"] = train["CATEGORY"].map(cat_id_dict)
    test["CATEGORY"] = test["CATEGORY"].map(cat_id_dict)

    # 辞書の読み込み
    with open(f"{PATH}/80/word_id_dict.pkl", "rb") as tf:
        word_id_dict = pickle.load(tf)

    N_LETTERS = len(word_id_dict.keys()) + 1  # pad分をplusする。
    EMB_SIZE = 300
    HIDDEN_SIZE = 50
    N_CATEGORIES = 4

    # deviceの指定
    device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
    print(f"Use {device}")

    # modelの定義
    model = RNN(
        vocab_size=N_LETTERS,
        emb_dim=EMB_SIZE,
        hidden_size=HIDDEN_SIZE,
        output_size=N_CATEGORIES,
        word_id_dict=word_id_dict,
    ).to(device)

    # criterion, optimizerの設定
    criterion = nn.CrossEntropyLoss()
    optimizer = torch.optim.SGD(model.parameters(), lr=0.05)

    # datasetの定義
    dataset_train = TextDataset(train["TITLE"], train["CATEGORY"], device)
    dataset_test = TextDataset(test["TITLE"], test["CATEGORY"], device)

    # parameterの更新
    BATCHSIZE = 32
    dataloader_train = DataLoader(
        dataset_train, batch_size=BATCHSIZE, shuffle=True, drop_last=True
    )

    train_losses = []
    train_accs = []
    test_losses = []
    test_accs = []

    EPOCH = 10
    for epoch in tqdm(range(EPOCH)):

        # 学習
        train_running_loss = train_fn(
            model,
            dataloader_train,
            device,
            optimizer,
            criterion,
            BATCHSIZE,
            HIDDEN_SIZE,
        )
        print(train_running_loss)

        # 損失と正解率の算出
        train_loss, train_acc = calculate_loss_and_accuracy(
            model, dataset_train, device, criterion
        )
        test_loss, test_acc = calculate_loss_and_accuracy(
            model, dataset_test, device, criterion
        )

        train_losses.append(train_loss)
        train_accs.append(train_acc)

        test_losses.append(test_loss)
        test_accs.append(test_acc)

        # 20epoch毎にチェックポイントを生成
        if epoch % 20 == 0:
            torch.save(model.state_dict(), f"84_model_epoch{epoch}.pth")
            torch.save(
                optimizer.state_dict(),
                f"84_optimizer_epoch{epoch}.pth",
            )

    # グラフへのプロット
    losses = {"train": train_losses, "test": test_losses}

    accs = {"train": train_accs, "test": test_accs}

    make_graph(losses, "losses", bn=BATCHSIZE, method="rnn_pretrain")
    make_graph(accs, "accs", bn=BATCHSIZE, method="rnn_pretrain")

    print(f"train_acc: {train_acc}")
    print(f"test_acc: {test_acc}")

初期化なしとありでの精度の差は下記のような感じです。 f:id:sinchir0:20210909082833p:plain

精度向上+早めに精度がサチるのがわかります。

85. 双方向RNN・多層化

双方向については、nn.RNNbidirectionalという引数があるので、それをTrueにするだけです。 多層化については、nn.RNNnum_layersという引数があるので、そこに層の数を与えるだけです。便利。 コードは3layerのものを貼っています。

# 85. 双方向RNN・多層化

import os
import pickle
import random

import gensim
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import torch
import torch.nn as nn
from numpy.lib.function_base import kaiser
from torch.utils.data import DataLoader, Dataset
from tqdm import tqdm


def seed_everything(seed=42, use_torch=False):
    random.seed(seed)
    os.environ["PYTHONHASHSEED"] = str(seed)
    np.random.seed(seed)
    if use_torch:
        torch.manual_seed(seed)
        torch.cuda.manual_seed(seed)
        torch.backends.cudnn.deterministic = True


seed_everything(use_torch=True)


class TextDataset(Dataset):
    def __init__(self, X, y, device):
        self.X = X
        self.y = y
        self.device = device

    def __len__(self):  # len(Dataset)で返す値を指定
        return len(self.X)

    def __getitem__(self, idx):  # Dataset[index]で返す値を指定
        # Xをintのlistに変換
        X_list = [int(x) for x in self.X[idx].split()]

        # tensorに変換
        inputs = torch.tensor(X_list, device=self.device)
        label = torch.tensor(self.y[idx], device=self.device)

        return inputs, label


class RNN(nn.Module):
    def __init__(self, vocab_size, emb_dim, hidden_size, output_size, word_id_dict):
        super().__init__()

        # embedding層の初期化
        model = gensim.models.KeyedVectors.load_word2vec_format(
            "../../ch07/60/GoogleNews-vectors-negative300.bin", binary=True
        )
        weight = torch.zeros(len(word_id_dict) + 1, 300)  # 1はPADの分
        for word, idx in word_id_dict.items():
            if word in model.vocab.keys():
                weight[idx] = torch.tensor(model[word])

        # pretrainする場合のemb
        # vocab_size, emb_dimは、GoogleNews-vectors-negative300が3000000語彙で300次元だから指定しなくて良い
        self.emb = nn.Embedding.from_pretrained(
            weight, padding_idx=0  # 0に変換された文字にベクトルを計算しない
        )

        # random
        # weight = torch.rand(len(word_id_dict)+1, 300) #ランダムな単語ベクトルでも効果があるか
        # self.emb = nn.Embedding.from_pretrained(weights)

        # Use False Idx
        # model = gensim.models.KeyedVectors.load_word2vec_format('../../ch07/60/GoogleNews-vectors-negative300.bin', binary=True)
        # weights = torch.FloatTensor(model.vectors)
        # self.emb = nn.Embedding.from_pretrained(weights)

        # # default
        # self.emb = nn.Embedding(
        #     vocab_size,
        #     emb_dim,
        #     padding_idx=0 # 0に変換された文字にベクトルを計算しない
        #     )

        self.rnn = nn.RNN(
            input_size=emb_dim,
            hidden_size=hidden_size,
            num_layers=3,
            nonlinearity="tanh",
            bias=True,
            batch_first=True,
            bidirectional=True,
        )
        # batch_first=Trueにすると、inputとoutputの型が変わる

        # inputの想定の型を、(L, N, H_in)から(N, L, H_in)に変更する
        # L: Sequence Length
        # N: Batch Size
        # H_in: input size

        # outputの想定の型を、(L, N, D * H_out)から(N, L, D * H_out)に変更する
        # L: Sequence Length
        # N: Batch Size
        # D: 2 if bidirectional=True otherwise 1
        # H_out: hidden size

        self.fc = nn.Linear(
            in_features=hidden_size * 2, out_features=output_size, bias=True
        )

        self.softmax = nn.Softmax(dim=1)

    def forward(self, x, h_0=None):
        x = self.emb(x)
        # 普通のとき、torch.Size([32, 10, 300])
        # 落ちるとき, torch.Size([16, 10, 300])
        # train.shape[0] == 10672
        # 10672を32で割ると、333 余り16
        # よって余りをどう扱うのかの問題になる
        # loaderの引数にdrop_last=Trueを追加すると、余りは捨てるようになる。
        x, h_t = self.rnn(x, h_0)
        x = x[:, -1, :]  # sequenceの長さ(現在は10)のうち、一番最後の出力に絞る、やっていいのかこれ?
        x = self.fc(x)
        x = self.softmax(x)
        return x


def train_fn(
    model, loader, device, optimizer, criterion, BATCHSIZE, HIDDEN_SIZE
) -> float:
    """model, loaderを用いて学習を行い、lossを返す"""
    # 学習モードに設定
    model.train()

    train_running_loss = 0.0

    for dataloader_x, dataloader_y in loader:
        dataloader_x.to(device)
        dataloader_y.to(device)

        optimizer.zero_grad()

        dataloader_y_pred_prob = model(
            x=dataloader_x,
            h_0=torch.zeros(2 * 3, BATCHSIZE, HIDDEN_SIZE).to(
                device
            ),  # D * num_layer, bidirectionnalの場合はD=2
        )

        # dataloader_xでの損失の計算/
        loss = criterion(dataloader_y_pred_prob, dataloader_y)
        # 勾配の計算
        loss.backward()
        optimizer.step()

        # 訓練データでの損失の平均を計算する
        train_running_loss += loss.item() / len(loader)

    return train_running_loss


def calculate_loss_and_accuracy(model, dataset, device=None, criterion=None):
    """損失・正解率を計算"""
    # 評価モードに設定
    model.eval()

    dataloader = DataLoader(dataset, batch_size=1, shuffle=False)

    loss = 0.0
    total = 0
    correct = 0

    with torch.no_grad():
        for dataloader_x, dataloader_y in dataloader:
            # 順伝播
            outputs = model(dataloader_x)

            # 損失計算
            loss += criterion(outputs, dataloader_y).item()

            # 正解率計算
            pred = torch.argmax(outputs, dim=-1)
            total += len(dataloader_x)
            correct += (pred == dataloader_y).sum().item()

    return loss / len(dataset), correct / total


def padding(id_seq: str, max_len: int):
    """id_seqについて、
    max_lenより長い場合はmax_lenまでの長さにする。
    max_lenより短い場合はmax_lenになるように0を追加する。
    """
    id_list = id_seq.split(" ")
    if len(id_list) > max_len:
        id_list = id_list[:max_len]
    else:
        pad_num = max_len - len(id_list)
        for _ in range(pad_num):
            id_list.append("0")
    return " ".join(id_list)


def make_graph(value_dict: dict, value_name: str, bn: int, method: str) -> None:
    """value_dictに関するgraphを生成し、保存する。"""
    for phase in ["train", "test"]:
        plt.plot(value_dict[phase], label=phase)
    plt.xlabel("epoch")
    plt.ylabel(value_name)
    plt.title(f"{value_name} per epoch at bn{bn}")
    plt.legend()
    plt.savefig(f"{method}_{value_name}_bn{bn}.png")
    plt.close()


if __name__ == "__main__":

    # Colab
    # PATH = '/content/drive/MyDrive/NLP100/ch09'
    # local
    PATH = ".."

    # データの読み込み
    train = pd.read_csv(f"{PATH}/80/train_title_id.csv")
    test = pd.read_csv(f"{PATH}/80/test_title_id.csv")
    test = test.reset_index(drop=True)

    # paddingの実施
    max_len = 10
    train["TITLE"] = train["TITLE"].apply(lambda x: padding(x, max_len))
    test["TITLE"] = test["TITLE"].apply(lambda x: padding(x, max_len))

    # yの変換
    cat_id_dict = {"b": 0, "t": 1, "e": 2, "m": 3}
    train["CATEGORY"] = train["CATEGORY"].map(cat_id_dict)
    test["CATEGORY"] = test["CATEGORY"].map(cat_id_dict)

    # 辞書の読み込み
    with open(f"{PATH}/80/word_id_dict.pkl", "rb") as tf:
        word_id_dict = pickle.load(tf)

    N_LETTERS = len(word_id_dict.keys()) + 1  # pad分をplusする。
    EMB_SIZE = 300
    HIDDEN_SIZE = 50
    N_CATEGORIES = 4

    # deviceの指定
    device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
    print(f"Use {device}")

    # modelの定義
    model = RNN(
        vocab_size=N_LETTERS,
        emb_dim=EMB_SIZE,
        hidden_size=HIDDEN_SIZE,
        output_size=N_CATEGORIES,
        word_id_dict=word_id_dict,
    ).to(device)

    # criterion, optimizerの設定
    criterion = nn.CrossEntropyLoss()
    optimizer = torch.optim.SGD(model.parameters(), lr=0.05)

    # datasetの定義
    dataset_train = TextDataset(train["TITLE"], train["CATEGORY"], device)
    dataset_test = TextDataset(test["TITLE"], test["CATEGORY"], device)

    # parameterの更新
    BATCHSIZE = 32
    dataloader_train = DataLoader(
        dataset_train, batch_size=BATCHSIZE, shuffle=True, drop_last=True
    )

    train_losses = []
    train_accs = []
    test_losses = []
    test_accs = []

    EPOCH = 10
    for epoch in tqdm(range(EPOCH)):

        # 学習
        train_running_loss = train_fn(
            model,
            dataloader_train,
            device,
            optimizer,
            criterion,
            BATCHSIZE,
            HIDDEN_SIZE,
        )
        print(train_running_loss)

        # 損失と正解率の算出
        train_loss, train_acc = calculate_loss_and_accuracy(
            model, dataset_train, device, criterion
        )
        test_loss, test_acc = calculate_loss_and_accuracy(
            model, dataset_test, device, criterion
        )

        train_losses.append(train_loss)
        train_accs.append(train_acc)

        test_losses.append(test_loss)
        test_accs.append(test_acc)

        # 20epoch毎にチェックポイントを生成
        if epoch % 20 == 0:
            torch.save(model.state_dict(), f"85_model_epoch{epoch}.pth")
            torch.save(
                optimizer.state_dict(),
                f"85_optimizer_epoch{epoch}.pth",
            )

    # グラフへのプロット
    losses = {"train": train_losses, "test": test_losses}

    accs = {"train": train_accs, "test": test_accs}

    make_graph(losses, "losses", bn=BATCHSIZE, method="rnn_bidirectional_3layer")
    make_graph(accs, "accs", bn=BATCHSIZE, method="rnn_bidirectional_3layer")

    print(f"train_acc: {train_acc: .4f}")
    print(f"test_acc: {test_acc: .4f}")

グラフはこちら、num_layersを増やすと精度的には悪化していくのは違和感ですね。 f:id:sinchir0:20210910074912p:plain

86. 畳み込みニューラルネットワーク (CNN)

nn.RNNで構築していた部分はnn.Conv1dnn.MaxPool1dで置き換えてあげます。 どの次元が何を表すかは、公式リファレンスを見ながら注意して構築していきました。 例えば、'nn.Conv1d'は下記のような感じです。

input size: (N, C_in, L)
N=batch size
C_m=number of channels
L=length of signal sequence
output size: (N, C_out, L_out)

実際のコードはこちら

# 86. 畳み込みニューラルネットワーク (CNN)

import os
import pickle
import random

import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import torch
import torch.nn as nn
from numpy.lib.function_base import kaiser
from torch.utils.data import DataLoader, Dataset
from tqdm import tqdm


def seed_everything(seed=42, use_torch=False):
    random.seed(seed)
    os.environ["PYTHONHASHSEED"] = str(seed)
    np.random.seed(seed)
    if use_torch:
        torch.manual_seed(seed)
        torch.cuda.manual_seed(seed)
        torch.backends.cudnn.deterministic = True


seed_everything(use_torch=True)


class TextDataset(Dataset):
    def __init__(self, X, y, device):
        self.X = X
        self.y = y
        self.device = device

    def __len__(self):  # len(Dataset)で返す値を指定
        return len(self.X)

    def __getitem__(self, idx):  # Dataset[index]で返す値を指定
        # Xをintのlistに変換
        X_list = [int(x) for x in self.X[idx].split()]

        # tensorに変換
        inputs = torch.tensor(X_list, device=self.device)
        label = torch.tensor(self.y[idx], device=self.device)

        return inputs, label


class CNN(nn.Module):
    def __init__(self, vocab_size, emb_dim, hidden_size, output_size, word_id_dict):
        super().__init__()

        self.emb = nn.Embedding(
            vocab_size, emb_dim, padding_idx=0  # 0に変換された文字にベクトルを計算しない
        )

        self.conv1d = nn.Conv1d(
            in_channels=emb_dim,
            out_channels=hidden_size,
            kernel_size=3,
            stride=1,
            padding=0,  # 4方向にpaddingを行う
            padding_mode="zeros",
        )
        # input size: (N, C_in, L)
        # N=batch size
        # C_m=number of channels
        # L=length of signal sequence
        # output size: (N, C_out, L_out)

        self.pool1d = nn.MaxPool1d(
            kernel_size=8  # conv1dにて300→50にすると、Sequential lengthは10→8になって返ってくる。この8を1にしたいので、全体でpooling
        )
        # input size: (N, C, L)
        # N=batch size
        # C_m=number of channels
        # L=length of signal sequence
        # output size: (N, C, L_out)

        self.fc = nn.Linear(
            in_features=hidden_size, out_features=output_size, bias=True
        )

        self.softmax = nn.Softmax(dim=1)

    def forward(self, x, h_0=None):
        x = self.emb(x)
        x = x.permute(
            0, 2, 1
        )  # 2次元目の数値を1次元目と入れ替え、1次元目にemb dim,2次元目にSequential lengthを持ってくる
        x = self.conv1d(x)
        x = self.pool1d(x)
        x = x[:, :, -1]  # (N, C, L_out)の次元L_outをなくす
        x = self.fc(x)
        x = self.softmax(x)
        return x
        # UserWarning: Named tensors and・・・で謎Warniningが出るけど、気にしなくて良いっぽい
        # https://github.com/pytorch/pytorch/pull/60059


def train_fn(
    model, loader, device, optimizer, criterion, BATCHSIZE, HIDDEN_SIZE
) -> float:
    """model, loaderを用いて学習を行い、lossを返す"""
    # 学習モードに設定
    model.train()

    train_running_loss = 0.0

    for dataloader_x, dataloader_y in loader:
        dataloader_x.to(device)
        dataloader_y.to(device)

        # optimizer.zero_grad()

        dataloader_y_pred_prob = model(
            x=dataloader_x,
            h_0=torch.zeros(2 * 1, BATCHSIZE, HIDDEN_SIZE).to(
                device
            ),  # D * num_layer, bidirectionnalの場合はD=2
        )

        # dataloader_xでの損失の計算/
        loss = criterion(dataloader_y_pred_prob, dataloader_y)
        # 勾配の計算
        loss.backward()
        # optimizer.step()

        # 訓練データでの損失の平均を計算する
        train_running_loss += loss.item() / len(loader)

    return train_running_loss


def calculate_loss_and_accuracy(model, dataset, device=None, criterion=None):
    """損失・正解率を計算"""
    # 評価モードに設定
    model.eval()

    dataloader = DataLoader(dataset, batch_size=1, shuffle=False)

    loss = 0.0
    total = 0
    correct = 0

    with torch.no_grad():
        for dataloader_x, dataloader_y in dataloader:
            # 順伝播
            outputs = model(dataloader_x)

            # 損失計算
            loss += criterion(outputs, dataloader_y).item()

            # 正解率計算
            pred = torch.argmax(outputs, dim=-1)
            total += len(dataloader_x)
            correct += (pred == dataloader_y).sum().item()

    return loss / len(dataset), correct / total


def padding(id_seq: str, max_len: int):
    """id_seqについて、
    max_lenより長い場合はmax_lenまでの長さにする。
    max_lenより短い場合はmax_lenになるように0を追加する。
    """
    id_list = id_seq.split(" ")
    if len(id_list) > max_len:
        id_list = id_list[:max_len]
    else:
        pad_num = max_len - len(id_list)
        for _ in range(pad_num):
            id_list.append("0")
    return " ".join(id_list)


def make_graph(value_dict: dict, value_name: str, bn: int, method: str) -> None:
    """value_dictに関するgraphを生成し、保存する。"""
    for phase in ["train", "test"]:
        plt.plot(value_dict[phase], label=phase)
    plt.xlabel("epoch")
    plt.ylabel(value_name)
    plt.title(f"{value_name} per epoch at bn{bn}")
    plt.legend()
    plt.savefig(f"{method}_{value_name}_bn{bn}.png")
    plt.close()


if __name__ == "__main__":

    # Colab
    # PATH = '/content/drive/MyDrive/NLP100/ch09'
    # local
    PATH = ".."

    # データの読み込み
    train = pd.read_csv(f"{PATH}/80/train_title_id.csv")
    test = pd.read_csv(f"{PATH}/80/test_title_id.csv")
    test = test.reset_index(drop=True)

    # paddingの実施
    max_len = 10
    train["TITLE"] = train["TITLE"].apply(lambda x: padding(x, max_len))
    test["TITLE"] = test["TITLE"].apply(lambda x: padding(x, max_len))

    # yの変換
    cat_id_dict = {"b": 0, "t": 1, "e": 2, "m": 3}
    train["CATEGORY"] = train["CATEGORY"].map(cat_id_dict)
    test["CATEGORY"] = test["CATEGORY"].map(cat_id_dict)

    # 辞書の読み込み
    with open(f"{PATH}/80/word_id_dict.pkl", "rb") as tf:
        word_id_dict = pickle.load(tf)

    N_LETTERS = len(word_id_dict.keys()) + 1  # pad分をplusする。
    EMB_SIZE = 300
    HIDDEN_SIZE = 50
    N_CATEGORIES = 4

    # deviceの指定
    device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
    print(f"Use {device}")

    # modelの定義
    model = CNN(
        vocab_size=N_LETTERS,
        emb_dim=EMB_SIZE,
        hidden_size=HIDDEN_SIZE,
        output_size=N_CATEGORIES,
        word_id_dict=word_id_dict,
    ).to(device)

    # criterion, optimizerの設定
    criterion = nn.CrossEntropyLoss()
    # optimizer = torch.optim.SGD(model.parameters(), lr=0.05)
    optimizer = None

    # datasetの定義
    dataset_train = TextDataset(train["TITLE"], train["CATEGORY"], device)
    dataset_test = TextDataset(test["TITLE"], test["CATEGORY"], device)

    # parameterの更新
    BATCHSIZE = 32
    dataloader_train = DataLoader(
        dataset_train, batch_size=BATCHSIZE, shuffle=True, drop_last=True
    )

    train_losses = []
    train_accs = []
    test_losses = []
    test_accs = []

    EPOCH = 10
    for epoch in tqdm(range(EPOCH)):

        # 学習
        train_running_loss = train_fn(
            model,
            dataloader_train,
            device,
            optimizer,
            criterion,
            BATCHSIZE,
            HIDDEN_SIZE,
        )
        print(train_running_loss)

        # 損失と正解率の算出
        train_loss, train_acc = calculate_loss_and_accuracy(
            model, dataset_train, device, criterion
        )
        test_loss, test_acc = calculate_loss_and_accuracy(
            model, dataset_test, device, criterion
        )

        train_losses.append(train_loss)
        train_accs.append(train_acc)

        test_losses.append(test_loss)
        test_accs.append(test_acc)

        # 20epoch毎にチェックポイントを生成
        if epoch % 20 == 0:
            torch.save(model.state_dict(), f"86_model_epoch{epoch}.pth")
            # torch.save(
            #     optimizer.state_dict(),
            #     f"86_optimizer_epoch{epoch}.pth",
            # )

    # グラフへのプロット
    losses = {"train": train_losses, "test": test_losses}

    accs = {"train": train_accs, "test": test_accs}

    make_graph(losses, "losses", bn=BATCHSIZE, method="cnn")
    make_graph(accs, "accs", bn=BATCHSIZE, method="cnn")

    print(f"train_acc: {train_acc: .4f}")
    print(f"test_acc: {test_acc: .4f}")

87. 確率的勾配降下法によるCNNの学習

SGDを追加して学習します。

# 87. 確率的勾配降下法によるCNNの学習

import os
import pickle
import random

import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import torch
import torch.nn as nn
from numpy.lib.function_base import kaiser
from torch.utils.data import DataLoader, Dataset
from tqdm import tqdm


def seed_everything(seed=42, use_torch=False):
    random.seed(seed)
    os.environ["PYTHONHASHSEED"] = str(seed)
    np.random.seed(seed)
    if use_torch:
        torch.manual_seed(seed)
        torch.cuda.manual_seed(seed)
        torch.backends.cudnn.deterministic = True


seed_everything(use_torch=True)


class TextDataset(Dataset):
    def __init__(self, X, y, device):
        self.X = X
        self.y = y
        self.device = device

    def __len__(self):  # len(Dataset)で返す値を指定
        return len(self.X)

    def __getitem__(self, idx):  # Dataset[index]で返す値を指定
        # Xをintのlistに変換
        X_list = [int(x) for x in self.X[idx].split()]

        # tensorに変換
        inputs = torch.tensor(X_list, device=self.device)
        label = torch.tensor(self.y[idx], device=self.device)

        return inputs, label


class CNN(nn.Module):
    def __init__(self, vocab_size, emb_dim, hidden_size, output_size, word_id_dict):
        super().__init__()

        # # embedding層の初期化
        # model = gensim.models.KeyedVectors.load_word2vec_format('../../ch07/60/GoogleNews-vectors-negative300.bin', binary=True)
        # weight = torch.zeros(len(word_id_dict)+1, 300) # 1はPADの分
        # for word, idx in word_id_dict.items():
        #     if word in model.vocab.keys():
        #         weight[idx] = torch.tensor(model[word])

        # # pretrainする場合のemb
        # # vocab_size, emb_dimは、GoogleNews-vectors-negative300が3000000語彙で300次元だから指定しなくて良い
        # self.emb = nn.Embedding.from_pretrained(
        #     weight,
        #     padding_idx=0 # 0に変換された文字にベクトルを計算しない
        #     )

        # default
        self.emb = nn.Embedding(
            vocab_size, emb_dim, padding_idx=0  # 0に変換された文字にベクトルを計算しない
        )

        # self.rnn = nn.RNN(
        #     input_size=emb_dim,
        #     hidden_size=hidden_size,
        #     num_layers=1,
        #     nonlinearity='tanh',
        #     bias=True,
        #     batch_first=True,
        #     bidirectional=True
        #     )

        self.conv1d = nn.Conv1d(
            in_channels=emb_dim,
            out_channels=hidden_size,
            kernel_size=3,
            stride=1,
            padding=0,  # 4方向にpaddingを行う
            padding_mode="zeros",
        )
        # input size: (N, C_in, L)
        # N=batch size
        # C_m=number of channels
        # L=length of signal sequence
        # output size: (N, C_out, L_out)

        self.pool1d = nn.MaxPool1d(
            kernel_size=8  # conv1dにて300→50にすると、Sequential lengthは10→8になって返ってくる。この8を1にしたいので、全体でpooling
        )
        # input size: (N, C, L)
        # N=batch size
        # C_m=number of channels
        # L=length of signal sequence
        # output size: (N, C, L_out)

        self.fc = nn.Linear(
            in_features=hidden_size, out_features=output_size, bias=True
        )

        self.softmax = nn.Softmax(dim=1)

    def forward(self, x, h_0=None):
        x = self.emb(x)
        x = x.permute(
            0, 2, 1
        )  # 2次元目の数値を1次元目と入れ替え、1次元目にemb dim,2次元目にSequential lengthを持ってくる
        x = self.conv1d(x)
        x = self.pool1d(x)
        x = x[:, :, -1]  # (N, C, L_out)の次元L_outをなくす
        x = self.fc(x)
        x = self.softmax(x)
        return x
        # UserWarning: Named tensors and・・・で謎Warniningが出るけど、気にしなくて良いっぽい
        # https://github.com/pytorch/pytorch/pull/60059


def train_fn(
    model, loader, device, optimizer, criterion, BATCHSIZE, HIDDEN_SIZE
) -> float:
    """model, loaderを用いて学習を行い、lossを返す"""
    # 学習モードに設定
    model.train()

    train_running_loss = 0.0

    for dataloader_x, dataloader_y in loader:
        dataloader_x.to(device)
        dataloader_y.to(device)

        optimizer.zero_grad()

        dataloader_y_pred_prob = model(
            x=dataloader_x,
            h_0=torch.zeros(2 * 1, BATCHSIZE, HIDDEN_SIZE).to(
                device
            ),  # D * num_layer, bidirectionnalの場合はD=2
        )

        # dataloader_xでの損失の計算/
        loss = criterion(dataloader_y_pred_prob, dataloader_y)
        # 勾配の計算
        loss.backward()
        optimizer.step()

        # 訓練データでの損失の平均を計算する
        train_running_loss += loss.item() / len(loader)

    return train_running_loss


def calculate_loss_and_accuracy(model, dataset, device=None, criterion=None):
    """損失・正解率を計算"""
    # 評価モードに設定
    model.eval()

    dataloader = DataLoader(dataset, batch_size=1, shuffle=False)

    loss = 0.0
    total = 0
    correct = 0

    with torch.no_grad():
        for dataloader_x, dataloader_y in dataloader:
            # 順伝播
            outputs = model(dataloader_x)

            # 損失計算
            loss += criterion(outputs, dataloader_y).item()

            # 正解率計算
            pred = torch.argmax(outputs, dim=-1)
            total += len(dataloader_x)
            correct += (pred == dataloader_y).sum().item()

    return loss / len(dataset), correct / total


def padding(id_seq: str, max_len: int):
    """id_seqについて、
    max_lenより長い場合はmax_lenまでの長さにする。
    max_lenより短い場合はmax_lenになるように0を追加する。
    """
    id_list = id_seq.split(" ")
    if len(id_list) > max_len:
        id_list = id_list[:max_len]
    else:
        pad_num = max_len - len(id_list)
        for _ in range(pad_num):
            id_list.append("0")
    return " ".join(id_list)


def make_graph(value_dict: dict, value_name: str, bn: int, method: str) -> None:
    """value_dictに関するgraphを生成し、保存する。"""
    for phase in ["train", "test"]:
        plt.plot(value_dict[phase], label=phase)
    plt.xlabel("epoch")
    plt.ylabel(value_name)
    plt.title(f"{value_name} per epoch at bn{bn}")
    plt.legend()
    plt.savefig(f"{method}_{value_name}_bn{bn}.png")
    plt.close()


if __name__ == "__main__":

    # Colab
    # PATH = '/content/drive/MyDrive/NLP100/ch09'
    # local
    PATH = ".."

    # データの読み込み
    train = pd.read_csv(f"{PATH}/80/train_title_id.csv")
    test = pd.read_csv(f"{PATH}/80/test_title_id.csv")
    test = test.reset_index(drop=True)

    # paddingの実施
    max_len = 10
    train["TITLE"] = train["TITLE"].apply(lambda x: padding(x, max_len))
    test["TITLE"] = test["TITLE"].apply(lambda x: padding(x, max_len))

    # yの変換
    cat_id_dict = {"b": 0, "t": 1, "e": 2, "m": 3}
    train["CATEGORY"] = train["CATEGORY"].map(cat_id_dict)
    test["CATEGORY"] = test["CATEGORY"].map(cat_id_dict)

    # 辞書の読み込み
    with open(f"{PATH}/80/word_id_dict.pkl", "rb") as tf:
        word_id_dict = pickle.load(tf)

    N_LETTERS = len(word_id_dict.keys()) + 1  # pad分をplusする。
    EMB_SIZE = 300
    HIDDEN_SIZE = 50
    N_CATEGORIES = 4

    # deviceの指定
    device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
    print(f"Use {device}")

    # modelの定義
    model = CNN(
        vocab_size=N_LETTERS,
        emb_dim=EMB_SIZE,
        hidden_size=HIDDEN_SIZE,
        output_size=N_CATEGORIES,
        word_id_dict=word_id_dict,
    ).to(device)

    # criterion, optimizerの設定
    criterion = nn.CrossEntropyLoss()
    optimizer = torch.optim.SGD(model.parameters(), lr=0.05)

    # datasetの定義
    dataset_train = TextDataset(train["TITLE"], train["CATEGORY"], device)
    dataset_test = TextDataset(test["TITLE"], test["CATEGORY"], device)

    # parameterの更新
    BATCHSIZE = 32
    dataloader_train = DataLoader(
        dataset_train, batch_size=BATCHSIZE, shuffle=True, drop_last=True
    )

    train_losses = []
    train_accs = []
    test_losses = []
    test_accs = []

    EPOCH = 10
    for epoch in tqdm(range(EPOCH)):

        # 学習
        train_running_loss = train_fn(
            model,
            dataloader_train,
            device,
            optimizer,
            criterion,
            BATCHSIZE,
            HIDDEN_SIZE,
        )
        print(train_running_loss)

        # 損失と正解率の算出
        train_loss, train_acc = calculate_loss_and_accuracy(
            model, dataset_train, device, criterion
        )
        test_loss, test_acc = calculate_loss_and_accuracy(
            model, dataset_test, device, criterion
        )

        train_losses.append(train_loss)
        train_accs.append(train_acc)

        test_losses.append(test_loss)
        test_accs.append(test_acc)

        # 20epoch毎にチェックポイントを生成
        if epoch % 20 == 0:
            torch.save(model.state_dict(), f"87_model_epoch{epoch}.pth")
            torch.save(
                optimizer.state_dict(),
                f"87_optimizer_epoch{epoch}.pth",
            )

    # グラフへのプロット
    losses = {"train": train_losses, "test": test_losses}

    accs = {"train": train_accs, "test": test_accs}

    make_graph(losses, "losses", bn=BATCHSIZE, method="cnn")
    make_graph(accs, "accs", bn=BATCHSIZE, method="cnn")

    print(f"train_acc: {train_acc: .4f}")
    print(f"test_acc: {test_acc: .4f}")

グラフはこちら。RNNよりもtrainとtestの乖離が酷くなりました。 f:id:sinchir0:20210910075921p:plain

88. パラメータチューニング

ハイパラチューニングの域を超えていますが、rnn→bi-lstmに変更し、epochを30までに変更しました。

# 88. パラメータチューニング

import os
import pickle
import random

import gensim
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import torch
import torch.nn as nn
from numpy.lib.function_base import kaiser
from torch.utils.data import DataLoader, Dataset
from tqdm import tqdm


def seed_everything(seed=42, use_torch=False):
    random.seed(seed)
    os.environ["PYTHONHASHSEED"] = str(seed)
    np.random.seed(seed)
    if use_torch:
        torch.manual_seed(seed)
        torch.cuda.manual_seed(seed)
        torch.backends.cudnn.deterministic = True


seed_everything(use_torch=True)


class TextDataset(Dataset):
    def __init__(self, X, y, device):
        self.X = X
        self.y = y
        self.device = device

    def __len__(self):  # len(Dataset)で返す値を指定
        return len(self.X)

    def __getitem__(self, idx):  # Dataset[index]で返す値を指定
        # Xをintのlistに変換
        X_list = [int(x) for x in self.X[idx].split()]

        # tensorに変換
        inputs = torch.tensor(X_list, device=self.device)
        label = torch.tensor(self.y[idx], device=self.device)

        return inputs, label


class RNN(nn.Module):
    def __init__(self, vocab_size, emb_dim, hidden_size, output_size, word_id_dict):
        super().__init__()

        # embedding層の初期化
        model = gensim.models.KeyedVectors.load_word2vec_format(
            "../../ch07/60/GoogleNews-vectors-negative300.bin", binary=True
        )
        weight = torch.zeros(len(word_id_dict) + 1, 300)  # 1はPADの分
        for word, idx in word_id_dict.items():
            if word in model.vocab.keys():
                weight[idx] = torch.tensor(model[word])

        # pretrainする場合のemb
        # vocab_size, emb_dimは、GoogleNews-vectors-negative300が3000000語彙で300次元だから指定しなくて良い
        self.emb = nn.Embedding.from_pretrained(
            weight, padding_idx=0  # 0に変換された文字にベクトルを計算しない
        )

        self.lstm = nn.LSTM(
            input_size=emb_dim,
            hidden_size=hidden_size,
            num_layers=1,
            bias=True,
            batch_first=True,
            dropout=0.25,
            bidirectional=True,
        )
        # batch_first=Trueにすると、inputとoutputの型が変わる

        # inputの想定の型を、(L, N, H_in)から(N, L, H_in)に変更する
        # L: Sequence Length
        # N: Batch Size
        # H_in: input size

        # outputの想定の型を、(L, N, D * H_out)から(N, L, D * H_out)に変更する
        # L: Sequence Length
        # N: Batch Size
        # D: 2 if bidirectional=True otherwise 1
        # H_out: hidden size

        self.fc = nn.Linear(
            in_features=hidden_size * 2, out_features=output_size, bias=True
        )

        self.softmax = nn.Softmax(dim=1)

    def forward(self, x, h_0=None):
        x = self.emb(x)
        # 普通のとき、torch.Size([32, 10, 300])
        # 落ちるとき, torch.Size([16, 10, 300])
        # train.shape[0] == 10672
        # 10672を32で割ると、333 余り16
        # よって余りをどう扱うのかの問題になる
        # loaderの引数にdrop_last=Trueを追加すると、余りは捨てるようになる。
        x, h_t = self.lstm(x, h_0)
        x = x[:, -1, :]
        x = self.fc(x)
        x = self.softmax(x)
        return x


def train_fn(
    model, loader, device, optimizer, criterion, BATCHSIZE, HIDDEN_SIZE
) -> float:
    """model, loaderを用いて学習を行い、lossを返す"""
    # 学習モードに設定
    model.train()

    train_running_loss = 0.0

    for dataloader_x, dataloader_y in loader:
        dataloader_x.to(device)
        dataloader_y.to(device)

        optimizer.zero_grad()

        dataloader_y_pred_prob = model(
            x=dataloader_x,
            # h_0=torch.zeros(2 * 1, BATCHSIZE, HIDDEN_SIZE).to(device) # D * num_layer, bidirectionnalの場合はD=2
            h_0=None,
        )

        # dataloader_xでの損失の計算/
        loss = criterion(dataloader_y_pred_prob, dataloader_y)
        # 勾配の計算
        loss.backward()
        optimizer.step()

        # 訓練データでの損失の平均を計算する
        train_running_loss += loss.item() / len(loader)

    return train_running_loss


def calculate_loss_and_accuracy(model, dataset, device=None, criterion=None):
    """損失・正解率を計算"""
    # 評価モードに設定
    model.eval()

    dataloader = DataLoader(dataset, batch_size=1, shuffle=False)

    loss = 0.0
    total = 0
    correct = 0

    with torch.no_grad():
        for dataloader_x, dataloader_y in dataloader:
            # 順伝播
            outputs = model(dataloader_x)

            # 損失計算
            loss += criterion(outputs, dataloader_y).item()

            # 正解率計算
            pred = torch.argmax(outputs, dim=-1)
            total += len(dataloader_x)
            correct += (pred == dataloader_y).sum().item()

    return loss / len(dataset), correct / total


def padding(id_seq: str, max_len: int):
    """id_seqについて、
    max_lenより長い場合はmax_lenまでの長さにする。
    max_lenより短い場合はmax_lenになるように0を追加する。
    """
    id_list = id_seq.split(" ")
    if len(id_list) > max_len:
        id_list = id_list[:max_len]
    else:
        pad_num = max_len - len(id_list)
        for _ in range(pad_num):
            id_list.append("0")
    return " ".join(id_list)


def make_graph(value_dict: dict, value_name: str, bn: int, method: str) -> None:
    """value_dictに関するgraphを生成し、保存する。"""
    for phase in ["train", "test"]:
        plt.plot(value_dict[phase], label=phase)
    plt.xlabel("epoch")
    plt.ylabel(value_name)
    plt.title(f"{value_name} per epoch at bn{bn}")
    plt.legend()
    plt.savefig(f"{method}_{value_name}_bn{bn}.png")
    plt.close()


if __name__ == "__main__":

    # Colab
    # PATH = '/content/drive/MyDrive/NLP100/ch09'
    # local
    PATH = ".."

    # データの読み込み
    train = pd.read_csv(f"{PATH}/80/train_title_id.csv")
    test = pd.read_csv(f"{PATH}/80/test_title_id.csv")
    test = test.reset_index(drop=True)

    # paddingの実施
    max_len = 10
    train["TITLE"] = train["TITLE"].apply(lambda x: padding(x, max_len))
    test["TITLE"] = test["TITLE"].apply(lambda x: padding(x, max_len))

    # yの変換
    cat_id_dict = {"b": 0, "t": 1, "e": 2, "m": 3}
    train["CATEGORY"] = train["CATEGORY"].map(cat_id_dict)
    test["CATEGORY"] = test["CATEGORY"].map(cat_id_dict)

    # 辞書の読み込み
    with open(f"{PATH}/80/word_id_dict.pkl", "rb") as tf:
        word_id_dict = pickle.load(tf)

    N_LETTERS = len(word_id_dict.keys()) + 1  # pad分をplusする。
    EMB_SIZE = 300
    HIDDEN_SIZE = 50
    N_CATEGORIES = 4

    # deviceの指定
    device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
    print(f"Use {device}")

    # modelの定義
    model = RNN(
        vocab_size=N_LETTERS,
        emb_dim=EMB_SIZE,
        hidden_size=HIDDEN_SIZE,
        output_size=N_CATEGORIES,
        word_id_dict=word_id_dict,
    ).to(device)

    # criterion, optimizerの設定
    criterion = nn.CrossEntropyLoss()
    optimizer = torch.optim.SGD(model.parameters(), lr=0.05)

    # datasetの定義
    dataset_train = TextDataset(train["TITLE"], train["CATEGORY"], device)
    dataset_test = TextDataset(test["TITLE"], test["CATEGORY"], device)

    # parameterの更新
    BATCHSIZE = 32
    dataloader_train = DataLoader(
        dataset_train, batch_size=BATCHSIZE, shuffle=True, drop_last=True
    )

    train_losses = []
    train_accs = []
    test_losses = []
    test_accs = []

    EPOCH = 30
    for epoch in tqdm(range(EPOCH)):

        # 学習
        train_running_loss = train_fn(
            model,
            dataloader_train,
            device,
            optimizer,
            criterion,
            BATCHSIZE,
            HIDDEN_SIZE,
        )
        print(train_running_loss)

        # 損失と正解率の算出
        train_loss, train_acc = calculate_loss_and_accuracy(
            model, dataset_train, device, criterion
        )
        test_loss, test_acc = calculate_loss_and_accuracy(
            model, dataset_test, device, criterion
        )

        train_losses.append(train_loss)
        train_accs.append(train_acc)

        test_losses.append(test_loss)
        test_accs.append(test_acc)

        # 20epoch毎にチェックポイントを生成
        if epoch % 20 == 0:
            torch.save(model.state_dict(), f"88_model_epoch{epoch}.pth")
            torch.save(
                optimizer.state_dict(),
                f"88_optimizer_epoch{epoch}.pth",
            )

    # グラフへのプロット
    losses = {"train": train_losses, "test": test_losses}

    accs = {"train": train_accs, "test": test_accs}

    make_graph(losses, "losses", bn=BATCHSIZE, method="bilstm")
    make_graph(accs, "accs", bn=BATCHSIZE, method="bilstm")

    print(f"train_acc: {train_acc: .4f}")
    print(f"test_acc: {test_acc: .4f}")

グラフはこちら。RNNよりも精度は落ちています。 f:id:sinchir0:20210910080242p:plain

89. 事前学習済み言語モデルからの転移学習

BERT, RoBERTa, ALBERT, CANINEでやってみました。 コード例はBERTです。

# 89. 事前学習済み言語モデルからの転移学習
# 事前学習済み言語モデル(例えばBERTなど)を出発点として,ニュース記事見出しをカテゴリに分類するモデルを構築せよ.

# https://qiita.com/yamaru/items/63a342c844cff056a549

import os
import random
import time

import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import torch
import torch.nn as nn
from torch.utils.data import DataLoader, Dataset
from tqdm import tqdm
from transformers import BertModel, BertTokenizer


def seed_everything(seed=42, use_torch=False):
    random.seed(seed)
    os.environ["PYTHONHASHSEED"] = str(seed)
    np.random.seed(seed)
    if use_torch:
        torch.manual_seed(seed)
        torch.cuda.manual_seed(seed)
        torch.backends.cudnn.deterministic = True


class TextDataset(Dataset):
    def __init__(self, X, y, tokenizer, max_len, device):
        self.X = X
        self.y = y
        self.tokenizer = tokenizer
        self.max_len = max_len
        self.device = device

    def __len__(self):  # len(Dataset)で返す値を指定
        return len(self.X)

    def __getitem__(self, idx):  # Dataset[index]で返す値を指定
        text = self.X[idx]
        inputs = self.tokenizer.encode_plus(
            text,
            add_special_tokens=True,
            max_length=self.max_len,
            # pad_to_max_length=True
            padding="max_length",
            truncation=True,
        )
        ids = inputs["input_ids"]
        mask = inputs["attention_mask"]

        return {
            "ids": torch.tensor(ids, device=device),
            "mask": torch.tensor(mask, device=device),
            "labels": torch.tensor([self.y[idx]], device=device),
        }


class BERTClass(nn.Module):
    def __init__(self, drop_rate, output_size):
        super().__init__()
        # outputs = model(ids, mask)
        # にて、下のエラーで落ちる
        # TypeError: dropout(): argument 'input' (position 1) must be Tensor, not str
        # https://github.com/huggingface/transformers/issues/8879#issuecomment-796328753
        # return_dict=Falseを追加したら解決
        # self.bert = BertModel.from_pretrained('bert-base-uncased', return_dict=False)
        self.bert = BertModel.from_pretrained("bert-base-uncased")
        self.drop = nn.Dropout(drop_rate)
        self.fc = nn.Linear(768, output_size)  # BERTの出力に合わせて768次元を指定
        self.softmax = nn.Softmax(dim=1)

    def forward(self, ids, mask):
        # _, x = self.bert(ids, attention_mask=mask)
        outputs = self.bert(ids, attention_mask=mask)
        _, x = outputs["last_hidden_state"], outputs["pooler_output"]
        # 引数の一つ目は、(batch_size, seq_length=10, 768)のテンソル、last_hidden_state
        # 引数の二つめは、(batch_size, 768)のテンソル、pooler_output
        # これは先頭単語[CLS]を取り出して、BertPoolerにて、同じhiddensize→hiddensizeへと全結合層を通して、その後tanhを通して-1~1にしたもの。
        x = self.drop(x)
        x = self.fc(x)
        x = self.softmax(x)
        return x


def train_fn(model, dataset, device, optimizer, criterion, BATCH_SIZE) -> float:
    """model, loaderを用いて学習を行い、lossを返す"""

    # dataloaderを生成
    loader = DataLoader(dataset, batch_size=BATCH_SIZE, shuffle=True, drop_last=True)

    # 学習モードに設定
    model.train()

    train_running_loss = 0.0

    for data in loader:
        # デバイスの指定
        ids = data["ids"].to(device)
        mask = data["mask"].to(device)
        labels = data["labels"].to(device)

        # labelsの次元を(BATCH_SIZE, 1)から(BATCH_SIZE,)に変更
        labels = labels.squeeze(1)

        optimizer.zero_grad()

        outputs = model(ids, mask)
        # dataloader_xでの損失の計算/
        loss = criterion(outputs, labels)
        # 勾配の計算
        loss.backward()
        optimizer.step()

        # 訓練データでの損失の平均を計算する
        train_running_loss += loss.item() / len(loader)

    return train_running_loss


def calculate_loss_and_accuracy(model, dataset, device=None, criterion=None):
    """損失・正解率を計算"""
    # 評価モードに設定
    model.eval()

    dataloader = DataLoader(dataset, batch_size=1, shuffle=False)

    loss = 0.0
    total = 0
    correct = 0

    with torch.no_grad():
        for data in dataloader:
            # 変数の生成、device送り
            ids = data["ids"].to(device)
            mask = data["mask"].to(device)
            labels = data["labels"].to(device)

            # 順伝播
            outputs = model(ids, mask)

            # labelsの次元を(BATCH_SIZE, 1)から(BATCH_SIZE,)に変更
            labels = labels.squeeze(1)

            # 損失計算
            loss += criterion(outputs, labels).item()

            # 正解率計算
            pred = torch.argmax(outputs, dim=-1)
            total += len(labels)
            correct += (pred == labels).sum().item()

    return loss / len(dataset), correct / total


def make_graph(value_dict: dict, value_name: str, bn: int, method: str) -> None:
    """value_dictに関するgraphを生成し、保存する。"""
    for phase in ["train", "test"]:
        plt.plot(value_dict[phase], label=phase)
    plt.xlabel("epoch")
    plt.ylabel(value_name)
    plt.title(f"{value_name} per epoch at bn{bn}")
    plt.legend()
    plt.savefig(f"{method}_{value_name}_bn{bn}.png")
    plt.close()


if __name__ == "__main__":

    DEBUG = False
    if DEBUG:
        print("DEBUG mode")

    METHOD = "bert"

    # 時間の計測開始
    start_time = time.time()

    # seedの固定
    seed_everything(use_torch=True)

    # Colab
    # PATH = '/content/drive/MyDrive/NLP100'
    # local
    PATH = "../../"

    # deviceの指定
    device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
    print(f"Use {device}")

    # データの読み込み
    train = pd.read_csv(f"{PATH}/ch06/50/train.txt", sep="\t", index_col=0)
    test = pd.read_csv(f"{PATH}/ch06/50/test.txt", sep="\t", index_col=0)

    # indexを再設定
    train = train.reset_index(drop=True)
    test = test.reset_index(drop=True)

    # 計算の短縮
    if DEBUG:
        train = train.sample(1000).reset_index(drop=True)
        test = test.sample(1000).reset_index(drop=True)

    # 正解データの生成
    cat_id_dict = {"b": 0, "t": 1, "e": 2, "m": 3}
    y_train = train["CATEGORY"].map(cat_id_dict)
    y_test = test["CATEGORY"].map(cat_id_dict)

    # tokenizerの読み込み
    tokenizer = BertTokenizer.from_pretrained("bert-base-uncased")
    max_len = 10

    # datasetの作成
    dataset_train = TextDataset(train["TITLE"], y_train, tokenizer, max_len, device)
    dataset_test = TextDataset(test["TITLE"], y_test, tokenizer, max_len, device)
    # {'input_ids': [101, 2885, 6561, 24514, 2391, 2006, 8169, 2586, 102, 0], 'token_type_ids': [0, 0, 0, 0, 0, 0, 0, 0, 0, 0], 'attention_mask': [1, 1, 1, 1, 1, 1, 1, 1, 1, 0]}
    # BERTでは、変換の過程で元の文の文頭と文末に特殊区切り文字である[CLS]と[SEP]がそれぞれ挿入されるため、それらも101と102として系列に含まれています。0はパディングを表します。

    # パラメータの設定
    DROP_RATE = 0.4
    OUTPUT_SIZE = 4
    BATCH_SIZE = 32
    NUM_EPOCHS = 30
    if DEBUG:
        NUM_EPOCHS = 2
    LEARNING_RATE = 2e-5

    # モデルの定義
    model = BERTClass(DROP_RATE, OUTPUT_SIZE).to(device)

    # criterion, optimizerの設定
    criterion = nn.CrossEntropyLoss()
    optimizer = torch.optim.SGD(model.parameters(), lr=LEARNING_RATE)

    train_losses = []
    train_accs = []
    test_losses = []
    test_accs = []

    for epoch in tqdm(range(NUM_EPOCHS)):
        # 学習
        train_running_loss = train_fn(
            model, dataset_train, device, optimizer, criterion, BATCH_SIZE
        )
        print(train_running_loss)

        # 損失と正解率の算出
        train_loss, train_acc = calculate_loss_and_accuracy(
            model, dataset_train, device, criterion
        )
        test_loss, test_acc = calculate_loss_and_accuracy(
            model, dataset_test, device, criterion
        )

        train_losses.append(train_loss)
        train_accs.append(train_acc)

        test_losses.append(test_loss)
        test_accs.append(test_acc)

        # 20epoch毎にチェックポイントを生成
        if epoch % 20 == 0:
            torch.save(model.state_dict(), f"89_{METHOD}_epoch{epoch}.pth")
            torch.save(
                optimizer.state_dict(),
                f"89_{METHOD}_optimizer_epoch{epoch}.pth",
            )

    # グラフへのプロット
    losses = {"train": train_losses, "test": test_losses}

    accs = {"train": train_accs, "test": test_accs}

    make_graph(losses, "losses", bn=BATCH_SIZE, method=METHOD)
    make_graph(accs, "accs", bn=BATCH_SIZE, method=METHOD)

    print(f"train_acc: {train_acc: .4f}")
    print(f"test_acc: {test_acc: .4f}")

    # 計測終了
    elapsed_time = time.time() - start_time
    print(f"elapsed_time:{elapsed_time:.0f}[sec]")

結果は下記のようになりました。RNNよりも全般的に悪い点が気になります。 また、この中ではALBERTが一番良いのも気になりますね。

model train_acc test_acc time(s)
BERT 0.6765 0.6304 4221
RoBERTa 0.4191 0.4505 4047
ALBERT 0.7848 0.7391 6941(K80)
CANINE 0.4191 0.4505 4944

感想

RNNのinput,outputやBERTの正確なoutputなどの理解がない状態で挑んだため、この章は時間がかかりました。 ただ、やり切ったおかげで簡単なモデルであればすぐに作れるようになったかなと思います。やってよかった自然言処理100本ノック。