【Python×機械学習】線形回帰のサンプルプログラムを作る


データ分析の手法の一つである線形回帰のサンプルプログラムを作ってみたいと思います。

課題は株価の動向を予測するというものです。

Python Machine Learning By Example: The easiest way to get into machine learning


この書籍の後半で株価を終値を予測するサンプルが紹介されていました。
このサンプルではライブラリーを使用していましたが、
使い方がわからないため、そして理解の助けとするためスクラッチで作りたいと思います。


学習プロセス

予測

  • w … 重み
  • x … 入力
  • y …出力


nをデータの数とすると以下のような公式になります。

y = \sum_{i=0}^n w_ix_i

誤差計算

  • y …出力
  • t … 正解


e = \frac{1}{n}\sum_{i=0}^n \frac{1}{2} (y_i-t_i)^2

勾配計算


w=w+\eta\frac{1}{n}\sum_{i=0}^n (t_i-y_i)x_i


基本的に以下の作業を繰り返します。

  1. 予測
  2. 誤差の計測
  3. 勾配計算

予測に必要な特徴量を抽出


株価を予測すると言っても方法はいろいろあると思います。

今回は線形回帰モデルを使い具体的な値を求めることで株価を予測しますが、
その場合学習させるためにいろいろとパラメーターを準備する必要があります。

今回実装するモデルでは37個の特徴量が必要になります。
この特徴量を入力されたデータから抽出します。

今回使用するヤフーファイナンスからダウンロードできるcsvのフォーマットは以下のような構成です。
https://finance.yahoo.com/sector/ms_technology

Date,Open,High,Low,Close,Adj Close,Volume
1996-08-09,14.250000,16.750000,14.250000,16.500000,15.264661,1601500
1996-08-12,16.500000,16.750000,16.375000,16.500000,15.264661,260900
...


そしてこのデータから以下の37個の数値を抽出します。

  1. 過去5日間の終値の平均
  2. 過去30日間の終値の平均
  3. 過去365日間の終値の平均
  4. 過去5日間の終値の平均 / 過去30日間の終値の平均
  5. 過去5日間の終値の平均 / 過去365日間の終値の平均
  6. 過去30日間の終値の平均 / 過去365日間の終値の平均
  7. 過去5日間の出来高の平均
  8. 過去30日間の出来高の平均
  9. 過去365日間の出来高の平均
  10. 過去5日間の出来高の平均 / 過去30日間の出来高の平均
  11. 過去5日間の出来高の平均 / 過去365日間の出来高の平均
  12. 過去30日間の出来高の平均 / 過去365日間の出来高の平均
  13. 過去5日間の終値の標準偏差
  14. 過去30日間の終値の標準偏差
  15. 過去365日間の終値の標準偏差
  16. 過去5日間の終値の標準偏差 / 過去30日間の終値の標準偏差
  17. 過去5日間の終値の標準偏差 / 過去365日間の終値の標準偏差
  18. 過去30日間の終値の標準偏差 / 過去365日間の終値の標準偏差
  19. 過去5日間の出来高の標準偏差
  20. 過去30日間の出来高の標準偏差
  21. 過去365日間の出来高の標準偏差
  22. 過去5日間の出来高の標準偏差 / 過去30日間の出来高の標準偏差
  23. 過去5日間の出来高の標準偏差 / 過去365日間の出来高の標準偏差
  24. 過去30日間の出来高の標準偏差 / 過去365日間の出来高の標準偏差
  25. 過去1日の収益率
  26. 過去5日の収益率
  27. 過去30日の収益率
  28. 過去365日の収益率
  29. 過去5日間の移動平均
  30. 過去30日間の移動平均
  31. 過去365日間の移動平均
  32. その日の始値
  33. 前日の始値
  34. 前日の終値
  35. 前日の高値
  36. 前日の安値
  37. 前日の出来高

実装


やることがそこそこ多いのでひとつずつやっていきたいと思います。

csvからデータを取得


上で記載したヤフーファイナンスから株価データをもらいます。
どんな企業の株価データでも問題ないと思います。

時々「null」という単語が含まれている場合があります。
手動で消してから処理に通します。

csvからデータを持ってくるソースコードです。
単純に一行ずつ配列に入れているだけですね。

def get_data_from_file(file):
    f = open(file, "r")
    # 一応ヘッダーを調べる
    if len(f.readline().split(",")) != 7:
        return None

    # リストで取得
    stock_datas = f.readlines()
    f.close()

    # カラムの下にそれぞれの値の配列を持つ
    column = ["Date","Open","High","Low","Close","Adj Close","Volume"]
    stocks = {}

    for c in column:
        stocks[c] = []

    for i in range(len(stock_datas)):
        sd = stock_datas[i].split(",")
        for j in range(len(sd)):
            if column[j] == "Date":
                stocks[column[j]].append(sd[j].replace("-",""))
            else:
                stocks[column[j]].append(float(sd[j]))

    return stocks

data = get_data_from_file(取得したファイル名)

特徴量抽出


次は特徴量の抽出です。
先ほど挙げた37個の値を生成します。
そのソースコードがこちらです。

他の方法(LSTMなど)では調べが足りていないのでわかりませんが、
線形回帰で特徴量を抽出する場合少なくとも365日分以上のデータが必要になります。
ですから株価データの初めから366番めを予測するデータの先頭とします。

def get_stock_features(data):
    START = 365
    if len(data["Date"]) < START:
        return None

    features = []

    for i in range(len(data["Date"])-START):
        feature = []

        for column in ["Close", "Volume"]:
            index = START+i
            # (上で挙げた特徴量リストの)1,7
            average5 = get_average(data[column][index-5:index])
            # 2,8
            average30 = get_average(data[column][index-30:index])
            # 3,9
            average365 = get_average(data[column][index-365:index])
            feature.append(average5)
            feature.append(average30)
            feature.append(average365)
            # 4,10
            feature.append(average5/average30)
            # 5,11
            feature.append(average5/average365)
            # 6,12
            feature.append(average30/average365)

            # 13,19
            std_dev5 = get_std_deviation(data[column][index-5:index])
            # 14,20
            std_dev30 = get_std_deviation(data[column][index-30:index])
            # 15,21
            std_dev365 = get_std_deviation(data[column][index-365:index])

            feature.append(std_dev5)
            feature.append(std_dev30)
            feature.append(std_dev365)
            # 16, 22
            feature.append(std_dev5/std_dev30)
            # 17, 23
            feature.append(std_dev5/std_dev365)
            # 18, 24
            feature.append(std_dev30/std_dev365)

        # 25
        feature.append( get_return(data["Close"], index, 1) )
        # 26
        feature.append( get_return(data["Close"], index, 5) )
        # 27
        feature.append( get_return(data["Close"], index, 30) )
        # 28
        feature.append( get_return(data["Close"], index, 365) )

        # 29
        feature.append( get_moving_average(data["Close"][index-5:index], 5) )
        # 30
        feature.append( get_moving_average(data["Close"][index-30:index], 30) )
        # 31
        feature.append( get_moving_average(data["Close"][index-365:index], 365) )

        # 32
        feature.append(data["Open"][index])
        # 33
        feature.append(data["Open"][index-1])
        # 34
        feature.append(data["Close"][index-1])
        # 35
        feature.append(data["High"][index-1])
        # 36
        feature.append(data["Low"][index-1])
        # 37
        feature.append(data["Volume"][index-1])

        features.append(feature)

    return features


そしてこの関数から呼ばれる関数たちです。

# 収益率
def get_return(data, index, day):
    return (data[index] - data[index-day]) / data[index-day]

# 移動平均
def get_moving_average(data, day):
    if len(data) < 5:
        return 0

    re_value = []

    for i in range(len(data)-1, len(data)-day ,-1):
        re_value.append( get_return(data, i, 1) )

    return sum(re_value) / len(re_value)

# 標準偏差
def get_std_deviation(data):
    d = []
    avg = get_average(data)
    for i in range(len(data)):
        d.append( (avg-data[i]) ** 2 )
    d = sum(d) / len(d)

    return math.sqrt(d)

# 平均
def get_average(data):
    return sum(data) / len(data)


これで特徴量を抽出出来ました。
ただひとつ問題があります。

今回SGDで学習していくわけですが、
SGDはパラメーター間の規模が大きすぎるとどうやらうまく学習できないそうなので、
特徴量をそれぞれ正規化します。

それを行う処理がこちらです。

def get_rescaled_feature(features):
    rescaled_f = []
    means = []
    std_devs = []
    # 平均、標準偏差
    for i in range(len(features[0])):
        data = []
        for j in range(len(features)):
            data.append(features[j][i])
        means.append(get_average(data))
        std_devs.append(get_std_deviation(data))

    for i in range(len(features)):
        f = []
        for j in range(len(features[i])):
            f.append((features[i][j] - means[j]) / std_devs[j])
        rescaled_f.append(f)

    return rescaled_f


これは特徴量 - 特徴量の平均 / 特徴量の標準偏差を計算します。
ちなみに平均と標準偏差はすべてのデータの同じ種類の特徴量の平均と標準偏差です。
例えば特徴量データが100個あり「過去5日間の終値」の平均を求める場合、
100個の「過去5日間の終値」の平均を求めるということです。


そしてターゲットパラメーター(正解の値)も同じように正規化します。
やっていることは上とほぼ同じです。

def get_rescaled_target(target):
    mean = get_average(target)
    std_dev = get_std_deviation(target)
    res = []

    for i in range(len(target)):
        res.append((target[i] - mean) / std_dev)

    return res

予測


実際に入力データ(特徴量)から株価を予測する処理です。
データと重みの内積を計算し結果を返します。
その結果が予測した値ということです。

def predict(x, w):
    if len(x[0]) != len(w):
        return None

    predictions = []

    for i in range(len(x)):
        predictions.append(dot(x[i], w))

    return predictions

def dot(v1,v2):
    if len(v1) != len(v2):
        return None

    value = 0
    for i in range(len(v1)):
        value += v1[i] * v2[i]

    return value


そして誤差を求める関数です。

def get_error(data, target):
    error = 0
    for i in range(len(data)):
        error += (data[i] - target[i]) ** 2 / 2
    return error / len(data)

重み更新


重みを更新する関数です。

def update_weight(f, p, t, w):
    if len(t) != len(p):
        return w
    lr = 0.01
    w_del = [0 for _ in range(len(w))]
    weight = [0 for _ in range(len(w))]
    for i in range(len(p)):
        t_p = t[i] - p[i]
        for j in range(len(w)):
            w_del[j] += t_p*f[i][j]

    for i in range(len(w)):
        weight[i] = w[i] + w_del[i]/len(p) * lr

    return weight

訓練


実際に動かします。
そのテストコードです。
だいぶ適当な作りですが、このあたりは人によって違ってくるのかなと思います。

とりあえず役に立ちそうな値を出力するようにしています。

def train(data):
    features = get_stock_features(data)
    features = get_rescaled_feature(features)
    raw_target = get_target(data, "Close", START)
    target = get_rescaled_target(raw_target)
    weight = [random() for _ in range(len(features[0]))]
    target_date = get_target(data, "Date", START)

    t_avg = get_average(raw_target)
    t_std = get_std_deviation(raw_target)

    ind_off = 9000
    train_f = features[:ind_off]
    train_t = target[:ind_off]

    for cnt in range(10000):
        randind = []
        datas = []
        targets = []
        for _ in range(100):
            while True:
                ind = randint(0,len(train_f)-1)
                if ind not in randind:
                    randind.append(ind)
                    datas.append(train_f[ind])
                    targets.append(train_t[ind])
                    break

        predictions = predict(datas, weight)
        if cnt % 100 == 0:
            print("error: {}".format(get_error(predictions, targets)))

        weight = update_weight(datas, predictions, targets, weight)

    print("test-----------------------------------------------------")
    test_f = features[ind_off:]
    test_t = target[ind_off:]
    test_d = target_date[ind_off:]
    r_test_t = raw_target[ind_off:]
    o_error = []
    for i in range(len(test_f)):
        print("[*] Target Date: {}".format(test_d[i]))
        predictions = predict([test_f[i]], weight)
        o_pre = t_avg / t_std * t_std + (predictions[0] * t_std)
        o_num_error = o_pre - r_test_t[i]
        o_error.append(o_num_error)
        print("- original price: {}".format(r_test_t[i]))
        print("- price: {}".format(test_t[i]))
        print("- original predicted price: {}".format(o_pre))
        print("- prediction: {}".format(predictions[0]))
        print("- std dev error: {}".format(predictions[0] - test_t[i]))
        print("- original number error: {}".format(o_num_error))
        print("")
    return sum(o_error) / len(o_error)

data = get_data_from_file("AAPL.csv")
error = train(data)
print("error: {}".format(error))


これを実行する訓練10000回ごとに誤差が表示されます。
10000回終了したらテストに入ります。
テスト時に以下のような情報を表示するようにしました。

[*] Target Date: 20190627
- original price: 199.740005
- price: 3.33266785074
- original predicted price: 201.776079227
- prediction: 3.37258021679
- std dev error: 0.0399123660457
- original number error: 2.03607422735

[*] Target Date: 20190628
- original price: 197.919998
- price: 3.29699096597
- original predicted price: 201.382371825
- prediction: 3.36486252469
- std dev error: 0.0678715587165
- original number error: 3.46237382455

[*] Target Date: 20190701
- original price: 201.550003
- price: 3.368148533
- original predicted price: 202.52252092
- prediction: 3.38721242108
- std dev error: 0.0190638880821
- original number error: 0.972517919701

[*] Target Date: 20190702
- original price: 202.729996
- price: 3.39127947386
- original predicted price: 204.498543832
- prediction: 3.4259476247
- std dev error: 0.0346681508378
- original number error: 1.76854783176

[*] Target Date: 20190703
- original price: 204.410004
- price: 3.42421201304
- original predicted price: 204.552333891
- prediction: 3.42700205018
- std dev error: 0.00279003713914
- original number error: 0.142329891088


今回はAppleの株価データを使用しました。
訓練データ9000個に対してテストデータは700個ほどです。
訓練データが少なすぎるとうまく学習できません。
個数に関してはうまく調整する必要があると思います。

所感


書籍だけではなくネットでもいろいろ調べたのですが、
株価の予測については今回試した方法だけではなく他にもいろいろあるようです。

まあこの方法でも一応学習はしているし、予測もまったくの的外れではないんですよね。