前回、PyTorchでニューラルネットワークを組んで手書き数字の認識を行いまして、9割くらいの正解率になりました。
昔、勉強にニューラルネットワークをPythonで組んだことがありますが、実用的な使い方がわからず、書いてみた仕舞いでしたが、初めて実用性を実感しました。
そこで、学習させたネットワークのライブラリ化ができないかなぁと思って、実施したことのメモです。
要約すると、実行ファイル(exe)化はできなかったので、PyTorchで学習させた重み等のパラメータを、Cythonで組んだ自前のニューラルネットワークライブラリに読み込ませるということを行いました。
環境:
Windows10 64bit
Python3.6.7
実行ファイル化の挑戦
まず最初に、実行ファイル化(exe)してみました。
py2exe
最初にずっと利用してきたpy2exeを使いました。試しにPyTorchをインポートするだけのファイルをexe化すると、容量は45MBほど。numpyが半分くらい占めてますかね。
問題は実行時にImportErrorになることです。
ImportError: (DLL load failed: 指定されたモジュールが見つかりません。) 'C:\\dist-dir\\torch._C.pyd'
torch._C.pydが無いと言われるのだけど、その場所にあるんですよね。
解決法が見出せませんので、あきらめました。ちなみに環境変数のPATHを追加してみても同じでした。
cx_freeze
次に初めてcx_freezeを使いましたが、デフォルトでexe化すると容量が3.72GB。
Pythonのインストールフォルダの容量が3.49GBなので、全部もってきたんか。なんだそりゃ。
PyInstallerもインストールされているモジュールを全て含む仕様とのことで、試していません。PyInstallerではexe化する際は、最低限のモジュールだけを入れた環境で実施するのが良いとのことですが、cx_freezeも同じですかね。
配布するソフト毎に環境を作るなんてできないので、あきらめました。
自前ニューラルネットワークの挑戦
過去に組んだニューラルネットワークでPyTorchで行ったのと同様の学習をさせてみました。
Pure Pythonで組んでいるので激遅のため、3000文字ほどしか学習させませんでしたが、一向に誤差が減らず、学習していない様子なのであきらめました。結構色々粘りましたが。
PyTorchの場合は、3000文字学習した時点で7割くらいの正解率になっているので、誤差の修正方法が違うとしても、まったく学習しないのは変なので、コードが間違っているかもしれません。
でも、3x3x3など少ないノード数でテストすると、狙った値に調整されるので、もしからしたら学習が足りてないだけかもしれませんので、一応、ソースコードを載せておきます。
ソースコードは下記の書籍のC言語のコードをPython化したものです。中身を理解することが目的なので、遅いけど分かりやすいforループで組んでいます。活性化関数はシグモイド関数オンリーです。
[simple_nn.py]
import math,random class Node: def __init__(self,no): self.no = no self.value = 0.0 self.targetValue= 0.0 self.bias = 1.0 self.biasWeight = 0.0 self.error = 0.0 def setLayer(self,layer): self.layer = layer self.weights = [] self.weightChanges = [] if layer.childLayer: for n in layer.childLayer.nodes: self.weights.append(0.0) self.weightChanges.append(0.0) class Layer: learningRate = 0.25 useMomentum = True momentumFactor = 0.5 parentLayer = None childLayer = None nodes = [] def __init__(self,nodes_num): self.parentLayer = None self.childLayer = None self.nodes = [] for i in range(nodes_num): node = Node(i) self.nodes.append(node) def setup(self): for node in self.nodes: node.setLayer(self) if self.childLayer: self.randWeights() def randWeights(self): for node in self.nodes: node.biasWeight = random.random()*2-1 for cnode in node.layer.childLayer.nodes: node.weights[cnode.no] = random.random()*2-1 def calcuValues(self): if self.parentLayer: max_value = -math.inf for node in self.nodes: value = 0.0 for pnode in self.parentLayer.nodes: value += pnode.value * pnode.weights[node.no] value += node.bias * node.biasWeight if value > 709: value = 709 elif value < -709: value = -709 node.value = 1.0/(1.0+math.exp(-value)) def calcuErrors(self): for node in self.nodes: if not self.parentLayer: #input layer node.error = 0.0 continue if self.childLayer: #hidden layer err = 0.0 for cnode in self.childLayer.nodes: err += cnode.error * node.weights[cnode.no] else: #output layer err = node.targetValue - node.value node.error = err*node.value*(1.0-node.value) def adjustWeights(self): if self.childLayer: for node in self.nodes: for cnode in self.childLayer.nodes: dw = self.learningRate*cnode.error*node.value if self.useMomentum: wc = node.weightChanges[cnode.no] node.weights[cnode.no] += dw+self.momentumFactor*wc node.weightChanges[cnode.no] = dw else: node.weights[cnode.no] += dw cnode.biasWeight += self.learningRate*cnode.error*cnode.bias class Network: def __init__(self,input_node_num,hidden_node_num,output_node_num): self.inputLayer = Layer(input_node_num) self.hiddenLayer = Layer(hidden_node_num) self.outputLayer = Layer(output_node_num) self.inputLayer.parentLayer = None self.inputLayer.childLayer = self.hiddenLayer self.hiddenLayer.parentLayer = self.inputLayer self.hiddenLayer.childLayer = self.outputLayer self.outputLayer.parentLayer = self.hiddenLayer self.outputLayer.childLayer = None self.inputLayer.setup() self.hiddenLayer.setup() self.outputLayer.setup() def setLearingRate(self,rate): self.inputLayer.learningRate = rate self.hiddenLayer.learningRate = rate self.outputLayer.learningRate = rate def setMomentum(self,use:bool,factor=0.5): self.inputLayer.useMomentum = use self.hiddenLayer.useMomentum = use self.outputLayer.useMomentum = use self.inputLayer.momentumFactor = factor self.hiddenLayer.momentumFactor = factor self.outputLayer.momentumFactor = factor def setInput(self,no,value): node = self.inputLayer.nodes[no] node.value = value def getOutput(self,no): node = self.outputLayer.nodes[no] return node.value def setTargetOutput(self,no,value): node = self.outputLayer.nodes[no] node.targetValue = value def feedForward(self): self.inputLayer.calcuValues() self.hiddenLayer.calcuValues() self.outputLayer.calcuValues() def backPropagate(self): self.outputLayer.calcuErrors() self.hiddenLayer.calcuErrors() self.hiddenLayer.adjustWeights() self.outputLayer.adjustWeights() def getMaxOutputNo(self): max_value = self.outputLayer.nodes[0].value max_no = 0 for node in self.outputLayer.nodes: if node.value > max_value: max_value = node.value max_no = node.no return max_no def calcuError(self): err = 0.0 for node in self.outputLayer.nodes: try: err += (node.value - node.targetValue)**2 except OverflowError: print("OverflowError") exit() err = err / len(self.outputLayer.nodes) return err def test(): nw = Network(3,3,3) nw.setMomentum(False) nw.setInput(0,1.0) nw.setInput(1,0.0) nw.setInput(2,0.0) nw.setTargetOutput(0,0.1) nw.setTargetOutput(1,0.9) nw.setTargetOutput(2,0.1) for i in range(100000): nw.feedForward() nw.backPropagate() e = nw.calcuError() if (i+1) % 1000 == 0: print('n={},e={}'.format(i+1,e)) if e < 0.000001: break print('n={},e={}'.format(i,e)) result = [] for node in nw.outputLayer.nodes: result.append(node.value) print(result) def main(): test() if __name__ == '__main__': main()
PyTorchで学習したパラメータを自前ニューラルネットワークに読み込み
自前のニューラルネットワークで学習させることはできなかったのですが、PyTorchで学習させた重みを、自前のニューラルネットワークに組み込んでしまえばいいんじゃ?っと思い、やってみました。
前回、PyTorchでは出力層をソフトマックス関数にしてましたが、順序は変わらないはずなので、シグモイド関数にしてます。
またPyTorchのパラメータ(重み、バイアス)をテキストデータに書き出す処理も書きました。
[load_pytorch_param_to_simple_nn.py]
# -*- coding: utf-8 -*- import os,random,time,pickle import torch import simple_nn class MyNet(torch.nn.Module): def __init__(self): super(MyNet, self).__init__() self.fc1 = torch.nn.Linear(512, 1000) self.fc2 = torch.nn.Linear(1000, 10) def forward(self, x): x = self.fc1(x) x = torch.sigmoid(x) x = self.fc2(x) return torch.sigmoid(x) def load_status(): mynet = simple_nn.Network(512,1000,10) net = MyNet() net.load_state_dict(torch.load('mnist_model.pth')) state = net.state_dict() for i,ws in enumerate(state['fc1.weight']): #max_i = 999 for j,w in enumerate(ws): #max_j = 511 node = mynet.inputLayer.nodes[j] node.weights[i] = w for i,ws in enumerate(state['fc2.weight']): #max_i=9 for j,w in enumerate(ws): #max_j = 999 node = mynet.hiddenLayer.nodes[j] node.weights[i] = w for i,b in enumerate(state['fc1.bias']): #max_i = 999 node = mynet.hiddenLayer.nodes[i] node.biasWeight = b for i,b in enumerate(state['fc2.bias']): #max_i = 511 node = mynet.outputLayer.nodes[i] node.biasWeight = b print('set state!') f = open('mnist_testing.dat','rb') datasets = pickle.load(f) f.close() check_n = 10 correct = 0 start_time = time.time() for i in range(check_n): dataset = datasets[random.randint(0,9999)] data = dataset['data'] for k,line in enumerate(data): for l,ch in enumerate(line): for m,value in enumerate(ch): n = m*(8**2) + k*8 + l mynet.setInput(n,value) print('set data! label = {}'.format(dataset['label'])) mynet.feedForward() result = mynet.getMaxOutputNo() print('calculated! result = {}'.format(result)) if dataset['label'] == result: correct += 1 print('N={}, Accuracy:={}'.format(check_n,correct/check_n)) print('Time {0}sec'.format(time.time()-start_time)) def save_status(): net = MyNet() net.load_state_dict(torch.load('mnist_model.pth')) state = net.state_dict() layer_names = ['fc1','fc2'] f = open('nn_adjusted_data.txt','w') for i in range(len(layer_names)): name = layer_names[i]+'.weight' f.write('weight layer {} n {}\n'.format(i+1,len(state[name]))) for j,weights in enumerate(state[name]): f.write('node {}:'.format(j)) for k,weight in enumerate(weights): if k == 0: f.write('{}'.format(weight)) else: f.write(',{}'.format(weight)) f.write('\n') for i in range(len(layer_names)): name = layer_names[i]+'.bias' f.write('bias layer {} n {}\n'.format(i+1,len(state[name]))) for j,bias in enumerate(state[name]): f.write('node {}:{}\n'.format(j,bias)) f.close() print('save! nn_adjusted_data.txt') if __name__ == '__main__': load_status() #save_status()
自前ニューラルネットワークのCython化
上記のは上手くいきましたが、1回計算させるのに13秒くらいかかります。元々Cython化することが前提だったので、やってみると、1回0.004秒になりました。3000倍以上の高速化。大体50~100倍くらい速くなるとの認識でしたが、ここまで速くなるのは初めてです。
[fixed_nn.pyx]
# -*- coding: utf-8 -*- from libc.math cimport exp from cpython.mem cimport PyMem_Malloc, PyMem_Realloc, PyMem_Free cdef struct Node: double value double bias double *weights cdef struct Layer: Node *nodes int number_of_nodes cdef Layer *layers cdef int number_of_layers def init(int n0,int n1,int n2): global layers,number_of_layers cdef int i,j,k,l,m,n number_of_layers = 3 layers = <Layer*> PyMem_Malloc(number_of_layers * sizeof(Layer)) ns = [n0,n1,n2] for i in range(number_of_layers): layers[i].nodes = <Node*> PyMem_Malloc(ns[i] * sizeof(Node)) layers[i].number_of_nodes = ns[i] for i in range(number_of_layers): for j in range(ns[i]): layers[i].nodes[j].value = 0.0 layers[i].nodes[j].bias = 0.0 if i != number_of_layers-1: #has child layer n = layers[i+1].number_of_nodes layers[i].nodes[j].weights = <double*> PyMem_Malloc(n * sizeof(double)) for k in range(n): layers[i].nodes[j].weights[k] = 0.0 def set_adjusted_data(fp): f = open(fp,'r') while True: line = f.readline() if not line: break info = line.split() mode = info[0] layer_idx = int(info[2]) data_n = int(info[4]) if mode == 'weight': for i in range(data_n): line = f.readline().strip() data = line.split(':') weights = data[1].split(',') for j in range(len(weights)): layers[layer_idx-1].nodes[j].weights[i] = float(weights[j]) elif mode == 'bias': for i in range(data_n): line = f.readline().strip() data = line.split(':') bias = float(data[1]) layers[layer_idx].nodes[i].bias = bias def set_input(array3d,h,w,c): # Height×Width×Channel cdef int i,j,k,l,m,n for i,row in enumerate(array3d): for j,col in enumerate(row): for k,value in enumerate(col): n = k*(c**2) + i*h + j layers[0].nodes[n].value = value def feed_forward(): cdef int i,j,k cdef double value,max_value,pv,pw for i in range(1,number_of_layers): #not input layer for j in range(layers[i].number_of_nodes): value = 0.0 for k in range(layers[i-1].number_of_nodes): pv = layers[i-1].nodes[k].value pw = layers[i-1].nodes[k].weights[j] value += pv * pw value += layers[i].nodes[j].bias #activation function # sigmoid if value > 709: value = 709 elif value < -709: value = -709 layers[i].nodes[j].value = 1.0/(1.0+exp(-value)) def get_output(no): return layers[number_of_layers-1].nodes[no].value def get_max_output_no(): n = number_of_layers-1 max_value = layers[n].nodes[0].value max_n = 0 for i in range(1,layers[n].number_of_nodes): v = layers[n].nodes[i].value if max_value < v: max_value = v max_no = i return max_no
[test.py]
# -*- coding: utf-8 -*- import pickle import random import time import fixed_nn as nn def main(): nn.init(512,1000,10) nn.set_adjusted_data('nn_adjusted_data.txt') f = open('mnist_testing.dat','rb') datasets = pickle.load(f) f.close() correct = 0.0 check_n = 100 start_time = time.time() for i in range(check_n): dataset = datasets[random.randint(0,9999)] data = dataset['data'] nn.set_input(data,8,8,8) nn.feed_forward() result = nn.get_max_output_no() if dataset['label'] == result: correct += 1 print('N={}, Accuracy:={}'.format(check_n,correct/check_n)) print('Time {0}sec'.format(time.time()-start_time)) if __name__ == '__main__': main()
Cythonのコードは、バックプロパゲーションによる重み調整も誤差計算もありません。シンプル。活性化関数はシグモイドしか書いてませんが、他の関数でも学習がないなら実装は簡単です。ファイル容量は70KB程度。CNNやらRNNならいざ知らず、単純なアルゴリズムなら、PyTorchで色々トライして、自前で実装という方法は悪くない気がしました。
一応、コンパイルしたpydも公開します。ほしい人はどうぞ。
Python2.7の32bit用と、Python3.6の64bitの2つをコンパイルしました。
ダウンロード先(OneDrive)
https://1drv.ms/u/s!Ak5J3V_CLLtbnQecCkxH8Xu-sRz7?e=FW4PIb
さて、元々の目的は、手書き数字の認識ライブラリまでもっていくことなので、画像を読み込んで、数字を返すところまでの実装です。細線化まではOpenCVで行うとして、ベクタ化処理をCythonで実装したいなぁ。