PyTorchニューラルネットワークのライブラリ化の試み

前回、PyTorchでニューラルネットワークを組んで手書き数字の認識を行いまして、9割くらいの正解率になりました。
昔、勉強にニューラルネットワークをPythonで組んだことがありますが、実用的な使い方がわからず、書いてみた仕舞いでしたが、初めて実用性を実感しました。

そこで、学習させたネットワークのライブラリ化ができないかなぁと思って、実施したことのメモです。

 

要約すると、実行ファイル(exe)化はできなかったので、PyTorchで学習させた重み等のパラメータを、Cythonで組んだ自前のニューラルネットワークライブラリに読み込ませるということを行いました。

 

 

環境:
Windows10 64bit
Python3.6.7

 

実行ファイル化の挑戦

まず最初に、実行ファイル化(exe)してみました。

py2exe

最初にずっと利用してきたpy2exeを使いました。試しにPyTorchをインポートするだけのファイルをexe化すると、容量は45MBほど。numpyが半分くらい占めてますかね。

問題は実行時にImportErrorになることです。

ImportError: (DLL load failed: 指定されたモジュールが見つかりません。) 'C:\\dist-dir\\torch._C.pyd'

torch._C.pydが無いと言われるのだけど、その場所にあるんですよね。
解決法が見出せませんので、あきらめました。ちなみに環境変数のPATHを追加してみても同じでした。

 

cx_freeze

次に初めてcx_freezeを使いましたが、デフォルトでexe化すると容量が3.72GB。
Pythonのインストールフォルダの容量が3.49GBなので、全部もってきたんか。なんだそりゃ。

PyInstallerもインストールされているモジュールを全て含む仕様とのことで、試していません。PyInstallerではexe化する際は、最低限のモジュールだけを入れた環境で実施するのが良いとのことですが、cx_freezeも同じですかね。
配布するソフト毎に環境を作るなんてできないので、あきらめました。

 

自前ニューラルネットワークの挑戦

過去に組んだニューラルネットワークでPyTorchで行ったのと同様の学習をさせてみました。
Pure Pythonで組んでいるので激遅のため、3000文字ほどしか学習させませんでしたが、一向に誤差が減らず、学習していない様子なのであきらめました。結構色々粘りましたが。

PyTorchの場合は、3000文字学習した時点で7割くらいの正解率になっているので、誤差の修正方法が違うとしても、まったく学習しないのは変なので、コードが間違っているかもしれません。

でも、3x3x3など少ないノード数でテストすると、狙った値に調整されるので、もしからしたら学習が足りてないだけかもしれませんので、一応、ソースコードを載せておきます。

ソースコードは下記の書籍のC言語のコードをPython化したものです。中身を理解することが目的なので、遅いけど分かりやすいforループで組んでいます。活性化関数はシグモイド関数オンリーです。

 

書籍;ゲーム開発者のためのAI入門

 

[simple_nn.py]

import math,random

class Node:
    def __init__(self,no):
        self.no = no
        self.value = 0.0
        self.targetValue= 0.0
        self.bias = 1.0
        self.biasWeight = 0.0
        self.error = 0.0

    def setLayer(self,layer):
        self.layer = layer
        self.weights = []
        self.weightChanges = []
        if layer.childLayer:
            for n in layer.childLayer.nodes:
                self.weights.append(0.0)
                self.weightChanges.append(0.0)

class Layer:
    learningRate = 0.25
    useMomentum = True
    momentumFactor = 0.5
    parentLayer = None
    childLayer = None
    nodes = []

    def __init__(self,nodes_num):
        self.parentLayer = None
        self.childLayer = None
        self.nodes = []

        for i in range(nodes_num):
            node = Node(i)
            self.nodes.append(node)

    def setup(self):
        for node in self.nodes:
            node.setLayer(self)
        if self.childLayer:
            self.randWeights()

    def randWeights(self):
        for node in self.nodes:
            node.biasWeight = random.random()*2-1
            for cnode in node.layer.childLayer.nodes:
                node.weights[cnode.no] = random.random()*2-1

    def calcuValues(self):
        if self.parentLayer:
            max_value = -math.inf
            for node in self.nodes:
                value = 0.0
                for pnode in self.parentLayer.nodes:
                    value += pnode.value * pnode.weights[node.no]
                value += node.bias * node.biasWeight

                if value > 709: value = 709
                elif value < -709: value = -709
                node.value = 1.0/(1.0+math.exp(-value))

    def calcuErrors(self):
        for node in self.nodes:
            if not self.parentLayer: #input layer
                node.error = 0.0
                continue
            if self.childLayer: #hidden layer
                err = 0.0
                for cnode in self.childLayer.nodes:
                    err += cnode.error * node.weights[cnode.no]
            else: #output layer
                err = node.targetValue - node.value
            node.error = err*node.value*(1.0-node.value)

    def adjustWeights(self):
        if self.childLayer:
            for node in self.nodes:
                for cnode in self.childLayer.nodes:
                    dw = self.learningRate*cnode.error*node.value
                    if self.useMomentum:
                        wc = node.weightChanges[cnode.no]
                        node.weights[cnode.no] += dw+self.momentumFactor*wc
                        node.weightChanges[cnode.no] = dw
                    else:
                        node.weights[cnode.no] += dw
                    cnode.biasWeight += self.learningRate*cnode.error*cnode.bias

class Network:

    def __init__(self,input_node_num,hidden_node_num,output_node_num):
        self.inputLayer = Layer(input_node_num)
        self.hiddenLayer = Layer(hidden_node_num)
        self.outputLayer = Layer(output_node_num)

        self.inputLayer.parentLayer = None
        self.inputLayer.childLayer = self.hiddenLayer

        self.hiddenLayer.parentLayer = self.inputLayer
        self.hiddenLayer.childLayer = self.outputLayer

        self.outputLayer.parentLayer = self.hiddenLayer
        self.outputLayer.childLayer = None

        self.inputLayer.setup()
        self.hiddenLayer.setup()
        self.outputLayer.setup()

    def setLearingRate(self,rate):
        self.inputLayer.learningRate = rate
        self.hiddenLayer.learningRate = rate
        self.outputLayer.learningRate = rate

    def setMomentum(self,use:bool,factor=0.5):
        self.inputLayer.useMomentum = use
        self.hiddenLayer.useMomentum = use
        self.outputLayer.useMomentum = use
        self.inputLayer.momentumFactor = factor
        self.hiddenLayer.momentumFactor = factor
        self.outputLayer.momentumFactor = factor

    def setInput(self,no,value):
        node = self.inputLayer.nodes[no]
        node.value = value

    def getOutput(self,no):
        node = self.outputLayer.nodes[no]
        return node.value

    def setTargetOutput(self,no,value):
        node = self.outputLayer.nodes[no]
        node.targetValue = value

    def feedForward(self):
        self.inputLayer.calcuValues()
        self.hiddenLayer.calcuValues()
        self.outputLayer.calcuValues()

    def backPropagate(self):
        self.outputLayer.calcuErrors()
        self.hiddenLayer.calcuErrors()

        self.hiddenLayer.adjustWeights()
        self.outputLayer.adjustWeights()

    def getMaxOutputNo(self):
        max_value = self.outputLayer.nodes[0].value
        max_no = 0
        for node in self.outputLayer.nodes:
            if node.value > max_value:
                max_value = node.value
                max_no = node.no
        return max_no

    def calcuError(self):
        err = 0.0
        for node in self.outputLayer.nodes:
            try:
                err += (node.value - node.targetValue)**2
            except OverflowError:
                print("OverflowError")
                exit()

        err = err / len(self.outputLayer.nodes)
        return err


def test():
    nw = Network(3,3,3)
    nw.setMomentum(False)

    nw.setInput(0,1.0)
    nw.setInput(1,0.0)
    nw.setInput(2,0.0)
    nw.setTargetOutput(0,0.1)
    nw.setTargetOutput(1,0.9)
    nw.setTargetOutput(2,0.1)

    for i in range(100000):
        nw.feedForward()
        nw.backPropagate()
        e = nw.calcuError()
        if (i+1) % 1000 == 0:
            print('n={},e={}'.format(i+1,e))
        if e < 0.000001:
            break
    print('n={},e={}'.format(i,e))
    result = []
    for node in nw.outputLayer.nodes:
        result.append(node.value)
    print(result)

def main():
    test()

if __name__ == '__main__':
    main()

 

PyTorchで学習したパラメータを自前ニューラルネットワークに読み込み

自前のニューラルネットワークで学習させることはできなかったのですが、PyTorchで学習させた重みを、自前のニューラルネットワークに組み込んでしまえばいいんじゃ?っと思い、やってみました。

前回、PyTorchでは出力層をソフトマックス関数にしてましたが、順序は変わらないはずなので、シグモイド関数にしてます。

またPyTorchのパラメータ(重み、バイアス)をテキストデータに書き出す処理も書きました。

[load_pytorch_param_to_simple_nn.py]

# -*- coding: utf-8 -*-
import os,random,time,pickle
import torch

import simple_nn

class MyNet(torch.nn.Module):
    def __init__(self):
        super(MyNet, self).__init__()
        self.fc1 = torch.nn.Linear(512, 1000)
        self.fc2 = torch.nn.Linear(1000, 10)

    def forward(self, x):
        x = self.fc1(x)
        x = torch.sigmoid(x)
        x = self.fc2(x)
        return torch.sigmoid(x)

def load_status():

    mynet = simple_nn.Network(512,1000,10)

    net = MyNet()
    net.load_state_dict(torch.load('mnist_model.pth'))

    state = net.state_dict()
    for i,ws in enumerate(state['fc1.weight']): #max_i = 999
        for j,w in enumerate(ws): #max_j = 511
            node = mynet.inputLayer.nodes[j]
            node.weights[i] = w

    for i,ws in enumerate(state['fc2.weight']): #max_i=9
        for j,w in enumerate(ws): #max_j = 999
            node = mynet.hiddenLayer.nodes[j]
            node.weights[i] = w

    for i,b in enumerate(state['fc1.bias']): #max_i = 999
        node = mynet.hiddenLayer.nodes[i]
        node.biasWeight = b

    for i,b in enumerate(state['fc2.bias']): #max_i = 511
        node = mynet.outputLayer.nodes[i]
        node.biasWeight = b
    print('set state!')

    f = open('mnist_testing.dat','rb')
    datasets = pickle.load(f)
    f.close()

    check_n = 10
    correct = 0
    start_time = time.time()
    for i in range(check_n):
        dataset = datasets[random.randint(0,9999)]
        data = dataset['data']
        for k,line in enumerate(data):
            for l,ch in enumerate(line):
                for m,value in enumerate(ch):
                    n = m*(8**2) + k*8 + l
                    mynet.setInput(n,value)
        print('set data! label = {}'.format(dataset['label']))
        mynet.feedForward()
        result = mynet.getMaxOutputNo()
        print('calculated! result = {}'.format(result))
        if dataset['label'] == result:
            correct += 1
    print('N={}, Accuracy:={}'.format(check_n,correct/check_n))
    print('Time {0}sec'.format(time.time()-start_time))

def save_status():
    net = MyNet()
    net.load_state_dict(torch.load('mnist_model.pth'))
    state = net.state_dict()
    layer_names = ['fc1','fc2']

    f = open('nn_adjusted_data.txt','w')
    for i in range(len(layer_names)):
        name = layer_names[i]+'.weight'
        f.write('weight layer {} n {}\n'.format(i+1,len(state[name])))
        for j,weights in enumerate(state[name]):
            f.write('node {}:'.format(j))
            for k,weight in enumerate(weights):
                if k == 0:
                    f.write('{}'.format(weight))
                else:
                    f.write(',{}'.format(weight))
            f.write('\n')
    for i in range(len(layer_names)):
        name = layer_names[i]+'.bias'
        f.write('bias layer {} n {}\n'.format(i+1,len(state[name])))
        for j,bias in enumerate(state[name]):
            f.write('node {}:{}\n'.format(j,bias))
    f.close()
    print('save! nn_adjusted_data.txt')

if __name__ == '__main__':
    load_status()
    #save_status()

 

自前ニューラルネットワークのCython化

上記のは上手くいきましたが、1回計算させるのに13秒くらいかかります。元々Cython化することが前提だったので、やってみると、1回0.004秒になりました。3000倍以上の高速化。大体50~100倍くらい速くなるとの認識でしたが、ここまで速くなるのは初めてです。

[fixed_nn.pyx]

# -*- coding: utf-8 -*-

from libc.math cimport exp
from cpython.mem cimport PyMem_Malloc, PyMem_Realloc, PyMem_Free

cdef struct Node:
    double value
    double bias
    double *weights

cdef struct Layer:
    Node *nodes
    int number_of_nodes

cdef Layer *layers
cdef int number_of_layers

def init(int n0,int n1,int n2):
    global layers,number_of_layers
    cdef int i,j,k,l,m,n

    number_of_layers = 3
    layers = <Layer*> PyMem_Malloc(number_of_layers * sizeof(Layer))
    ns = [n0,n1,n2]
    for i in range(number_of_layers):
        layers[i].nodes = <Node*> PyMem_Malloc(ns[i] * sizeof(Node))
        layers[i].number_of_nodes = ns[i]

    for i in range(number_of_layers):
        for j in range(ns[i]):
            layers[i].nodes[j].value = 0.0
            layers[i].nodes[j].bias = 0.0
            if i != number_of_layers-1: #has child layer
                n = layers[i+1].number_of_nodes
                layers[i].nodes[j].weights = <double*> PyMem_Malloc(n * sizeof(double))
                for k in range(n):
                    layers[i].nodes[j].weights[k] = 0.0

def set_adjusted_data(fp):
    f = open(fp,'r')
    while True:
        line = f.readline()
        if not line:
            break
        info = line.split()
        mode = info[0]
        layer_idx = int(info[2])
        data_n = int(info[4])
        if mode == 'weight':
            for i in range(data_n):
                line = f.readline().strip()
                data = line.split(':')
                weights = data[1].split(',')
                for j in range(len(weights)):
                    layers[layer_idx-1].nodes[j].weights[i] = float(weights[j])
        elif mode == 'bias':
            for i in range(data_n):
                line = f.readline().strip()
                data = line.split(':')
                bias = float(data[1])
                layers[layer_idx].nodes[i].bias = bias

def set_input(array3d,h,w,c): # Height×Width×Channel
    cdef int i,j,k,l,m,n
    for i,row in enumerate(array3d):
        for j,col in enumerate(row):
            for k,value in enumerate(col):
                n = k*(c**2) + i*h + j
                layers[0].nodes[n].value = value

def feed_forward():
    cdef int i,j,k
    cdef double value,max_value,pv,pw

    for i in range(1,number_of_layers): #not input layer
        for j in range(layers[i].number_of_nodes):
            value = 0.0
            for k in range(layers[i-1].number_of_nodes):
                pv = layers[i-1].nodes[k].value
                pw = layers[i-1].nodes[k].weights[j]
                value += pv * pw
            value += layers[i].nodes[j].bias
            #activation function
            # sigmoid
            if value > 709: value = 709
            elif value < -709: value = -709
            layers[i].nodes[j].value = 1.0/(1.0+exp(-value))

def get_output(no):
    return layers[number_of_layers-1].nodes[no].value

def get_max_output_no():
    n = number_of_layers-1
    max_value = layers[n].nodes[0].value
    max_n = 0
    for i in range(1,layers[n].number_of_nodes):
        v = layers[n].nodes[i].value
        if max_value < v:
            max_value = v
            max_no = i
    return max_no

[test.py]

# -*- coding: utf-8 -*-
import pickle
import random
import time

import fixed_nn as nn

def main():
    nn.init(512,1000,10)
    nn.set_adjusted_data('nn_adjusted_data.txt')

    f = open('mnist_testing.dat','rb')
    datasets = pickle.load(f)
    f.close()

    correct = 0.0
    check_n = 100
    start_time = time.time()
    for i in range(check_n):
        dataset = datasets[random.randint(0,9999)]
        data = dataset['data']
        nn.set_input(data,8,8,8)
        nn.feed_forward()
        result = nn.get_max_output_no()
        if dataset['label'] == result:
            correct += 1

    print('N={}, Accuracy:={}'.format(check_n,correct/check_n))
    print('Time {0}sec'.format(time.time()-start_time))

if __name__ == '__main__':
    main()

 

Cythonのコードは、バックプロパゲーションによる重み調整も誤差計算もありません。シンプル。活性化関数はシグモイドしか書いてませんが、他の関数でも学習がないなら実装は簡単です。ファイル容量は70KB程度。CNNやらRNNならいざ知らず、単純なアルゴリズムなら、PyTorchで色々トライして、自前で実装という方法は悪くない気がしました。

 

一応、コンパイルしたpydも公開します。ほしい人はどうぞ。
Python2.7の32bit用と、Python3.6の64bitの2つをコンパイルしました。

ダウンロード先(OneDrive)
https://1drv.ms/u/s!Ak5J3V_CLLtbnQecCkxH8Xu-sRz7?e=FW4PIb

 
 

さて、元々の目的は、手書き数字の認識ライブラリまでもっていくことなので、画像を読み込んで、数字を返すところまでの実装です。細線化まではOpenCVで行うとして、ベクタ化処理をCythonで実装したいなぁ。

Updated: 2021年2月13日 — 05:36

コメントを残す

メールアドレスが公開されることはありません。 が付いている欄は必須項目です