前回、PyTorchでニューラルネットワークを組んで手書き数字の認識を行いまして、9割くらいの正解率になりました。
昔、勉強にニューラルネットワークをPythonで組んだことがありますが、実用的な使い方がわからず、書いてみた仕舞いでしたが、初めて実用性を実感しました。
そこで、学習させたネットワークのライブラリ化ができないかなぁと思って、実施したことのメモです。
要約すると、実行ファイル(exe)化はできなかったので、PyTorchで学習させた重み等のパラメータを、Cythonで組んだ自前のニューラルネットワークライブラリに読み込ませるということを行いました。
環境:
Windows10 64bit
Python3.6.7
実行ファイル化の挑戦
まず最初に、実行ファイル化(exe)してみました。
py2exe
最初にずっと利用してきたpy2exeを使いました。試しにPyTorchをインポートするだけのファイルをexe化すると、容量は45MBほど。numpyが半分くらい占めてますかね。
問題は実行時にImportErrorになることです。
ImportError: (DLL load failed: 指定されたモジュールが見つかりません。) 'C:\\dist-dir\\torch._C.pyd'
torch._C.pydが無いと言われるのだけど、その場所にあるんですよね。
解決法が見出せませんので、あきらめました。ちなみに環境変数のPATHを追加してみても同じでした。
cx_freeze
次に初めてcx_freezeを使いましたが、デフォルトでexe化すると容量が3.72GB。
Pythonのインストールフォルダの容量が3.49GBなので、全部もってきたんか。なんだそりゃ。
PyInstallerもインストールされているモジュールを全て含む仕様とのことで、試していません。PyInstallerではexe化する際は、最低限のモジュールだけを入れた環境で実施するのが良いとのことですが、cx_freezeも同じですかね。
配布するソフト毎に環境を作るなんてできないので、あきらめました。
自前ニューラルネットワークの挑戦
過去に組んだニューラルネットワークでPyTorchで行ったのと同様の学習をさせてみました。
Pure Pythonで組んでいるので激遅のため、3000文字ほどしか学習させませんでしたが、一向に誤差が減らず、学習していない様子なのであきらめました。結構色々粘りましたが。
PyTorchの場合は、3000文字学習した時点で7割くらいの正解率になっているので、誤差の修正方法が違うとしても、まったく学習しないのは変なので、コードが間違っているかもしれません。
でも、3x3x3など少ないノード数でテストすると、狙った値に調整されるので、もしからしたら学習が足りてないだけかもしれませんので、一応、ソースコードを載せておきます。
ソースコードは下記の書籍のC言語のコードをPython化したものです。中身を理解することが目的なので、遅いけど分かりやすいforループで組んでいます。活性化関数はシグモイド関数オンリーです。
[simple_nn.py]
import math,random
class Node:
def __init__(self,no):
self.no = no
self.value = 0.0
self.targetValue= 0.0
self.bias = 1.0
self.biasWeight = 0.0
self.error = 0.0
def setLayer(self,layer):
self.layer = layer
self.weights = []
self.weightChanges = []
if layer.childLayer:
for n in layer.childLayer.nodes:
self.weights.append(0.0)
self.weightChanges.append(0.0)
class Layer:
learningRate = 0.25
useMomentum = True
momentumFactor = 0.5
parentLayer = None
childLayer = None
nodes = []
def __init__(self,nodes_num):
self.parentLayer = None
self.childLayer = None
self.nodes = []
for i in range(nodes_num):
node = Node(i)
self.nodes.append(node)
def setup(self):
for node in self.nodes:
node.setLayer(self)
if self.childLayer:
self.randWeights()
def randWeights(self):
for node in self.nodes:
node.biasWeight = random.random()*2-1
for cnode in node.layer.childLayer.nodes:
node.weights[cnode.no] = random.random()*2-1
def calcuValues(self):
if self.parentLayer:
max_value = -math.inf
for node in self.nodes:
value = 0.0
for pnode in self.parentLayer.nodes:
value += pnode.value * pnode.weights[node.no]
value += node.bias * node.biasWeight
if value > 709: value = 709
elif value < -709: value = -709
node.value = 1.0/(1.0+math.exp(-value))
def calcuErrors(self):
for node in self.nodes:
if not self.parentLayer: #input layer
node.error = 0.0
continue
if self.childLayer: #hidden layer
err = 0.0
for cnode in self.childLayer.nodes:
err += cnode.error * node.weights[cnode.no]
else: #output layer
err = node.targetValue - node.value
node.error = err*node.value*(1.0-node.value)
def adjustWeights(self):
if self.childLayer:
for node in self.nodes:
for cnode in self.childLayer.nodes:
dw = self.learningRate*cnode.error*node.value
if self.useMomentum:
wc = node.weightChanges[cnode.no]
node.weights[cnode.no] += dw+self.momentumFactor*wc
node.weightChanges[cnode.no] = dw
else:
node.weights[cnode.no] += dw
cnode.biasWeight += self.learningRate*cnode.error*cnode.bias
class Network:
def __init__(self,input_node_num,hidden_node_num,output_node_num):
self.inputLayer = Layer(input_node_num)
self.hiddenLayer = Layer(hidden_node_num)
self.outputLayer = Layer(output_node_num)
self.inputLayer.parentLayer = None
self.inputLayer.childLayer = self.hiddenLayer
self.hiddenLayer.parentLayer = self.inputLayer
self.hiddenLayer.childLayer = self.outputLayer
self.outputLayer.parentLayer = self.hiddenLayer
self.outputLayer.childLayer = None
self.inputLayer.setup()
self.hiddenLayer.setup()
self.outputLayer.setup()
def setLearingRate(self,rate):
self.inputLayer.learningRate = rate
self.hiddenLayer.learningRate = rate
self.outputLayer.learningRate = rate
def setMomentum(self,use:bool,factor=0.5):
self.inputLayer.useMomentum = use
self.hiddenLayer.useMomentum = use
self.outputLayer.useMomentum = use
self.inputLayer.momentumFactor = factor
self.hiddenLayer.momentumFactor = factor
self.outputLayer.momentumFactor = factor
def setInput(self,no,value):
node = self.inputLayer.nodes[no]
node.value = value
def getOutput(self,no):
node = self.outputLayer.nodes[no]
return node.value
def setTargetOutput(self,no,value):
node = self.outputLayer.nodes[no]
node.targetValue = value
def feedForward(self):
self.inputLayer.calcuValues()
self.hiddenLayer.calcuValues()
self.outputLayer.calcuValues()
def backPropagate(self):
self.outputLayer.calcuErrors()
self.hiddenLayer.calcuErrors()
self.hiddenLayer.adjustWeights()
self.outputLayer.adjustWeights()
def getMaxOutputNo(self):
max_value = self.outputLayer.nodes[0].value
max_no = 0
for node in self.outputLayer.nodes:
if node.value > max_value:
max_value = node.value
max_no = node.no
return max_no
def calcuError(self):
err = 0.0
for node in self.outputLayer.nodes:
try:
err += (node.value - node.targetValue)**2
except OverflowError:
print("OverflowError")
exit()
err = err / len(self.outputLayer.nodes)
return err
def test():
nw = Network(3,3,3)
nw.setMomentum(False)
nw.setInput(0,1.0)
nw.setInput(1,0.0)
nw.setInput(2,0.0)
nw.setTargetOutput(0,0.1)
nw.setTargetOutput(1,0.9)
nw.setTargetOutput(2,0.1)
for i in range(100000):
nw.feedForward()
nw.backPropagate()
e = nw.calcuError()
if (i+1) % 1000 == 0:
print('n={},e={}'.format(i+1,e))
if e < 0.000001:
break
print('n={},e={}'.format(i,e))
result = []
for node in nw.outputLayer.nodes:
result.append(node.value)
print(result)
def main():
test()
if __name__ == '__main__':
main()
PyTorchで学習したパラメータを自前ニューラルネットワークに読み込み
自前のニューラルネットワークで学習させることはできなかったのですが、PyTorchで学習させた重みを、自前のニューラルネットワークに組み込んでしまえばいいんじゃ?っと思い、やってみました。
前回、PyTorchでは出力層をソフトマックス関数にしてましたが、順序は変わらないはずなので、シグモイド関数にしてます。
またPyTorchのパラメータ(重み、バイアス)をテキストデータに書き出す処理も書きました。
[load_pytorch_param_to_simple_nn.py]
# -*- coding: utf-8 -*-
import os,random,time,pickle
import torch
import simple_nn
class MyNet(torch.nn.Module):
def __init__(self):
super(MyNet, self).__init__()
self.fc1 = torch.nn.Linear(512, 1000)
self.fc2 = torch.nn.Linear(1000, 10)
def forward(self, x):
x = self.fc1(x)
x = torch.sigmoid(x)
x = self.fc2(x)
return torch.sigmoid(x)
def load_status():
mynet = simple_nn.Network(512,1000,10)
net = MyNet()
net.load_state_dict(torch.load('mnist_model.pth'))
state = net.state_dict()
for i,ws in enumerate(state['fc1.weight']): #max_i = 999
for j,w in enumerate(ws): #max_j = 511
node = mynet.inputLayer.nodes[j]
node.weights[i] = w
for i,ws in enumerate(state['fc2.weight']): #max_i=9
for j,w in enumerate(ws): #max_j = 999
node = mynet.hiddenLayer.nodes[j]
node.weights[i] = w
for i,b in enumerate(state['fc1.bias']): #max_i = 999
node = mynet.hiddenLayer.nodes[i]
node.biasWeight = b
for i,b in enumerate(state['fc2.bias']): #max_i = 511
node = mynet.outputLayer.nodes[i]
node.biasWeight = b
print('set state!')
f = open('mnist_testing.dat','rb')
datasets = pickle.load(f)
f.close()
check_n = 10
correct = 0
start_time = time.time()
for i in range(check_n):
dataset = datasets[random.randint(0,9999)]
data = dataset['data']
for k,line in enumerate(data):
for l,ch in enumerate(line):
for m,value in enumerate(ch):
n = m*(8**2) + k*8 + l
mynet.setInput(n,value)
print('set data! label = {}'.format(dataset['label']))
mynet.feedForward()
result = mynet.getMaxOutputNo()
print('calculated! result = {}'.format(result))
if dataset['label'] == result:
correct += 1
print('N={}, Accuracy:={}'.format(check_n,correct/check_n))
print('Time {0}sec'.format(time.time()-start_time))
def save_status():
net = MyNet()
net.load_state_dict(torch.load('mnist_model.pth'))
state = net.state_dict()
layer_names = ['fc1','fc2']
f = open('nn_adjusted_data.txt','w')
for i in range(len(layer_names)):
name = layer_names[i]+'.weight'
f.write('weight layer {} n {}\n'.format(i+1,len(state[name])))
for j,weights in enumerate(state[name]):
f.write('node {}:'.format(j))
for k,weight in enumerate(weights):
if k == 0:
f.write('{}'.format(weight))
else:
f.write(',{}'.format(weight))
f.write('\n')
for i in range(len(layer_names)):
name = layer_names[i]+'.bias'
f.write('bias layer {} n {}\n'.format(i+1,len(state[name])))
for j,bias in enumerate(state[name]):
f.write('node {}:{}\n'.format(j,bias))
f.close()
print('save! nn_adjusted_data.txt')
if __name__ == '__main__':
load_status()
#save_status()
自前ニューラルネットワークのCython化
上記のは上手くいきましたが、1回計算させるのに13秒くらいかかります。元々Cython化することが前提だったので、やってみると、1回0.004秒になりました。3000倍以上の高速化。大体50~100倍くらい速くなるとの認識でしたが、ここまで速くなるのは初めてです。
[fixed_nn.pyx]
# -*- coding: utf-8 -*-
from libc.math cimport exp
from cpython.mem cimport PyMem_Malloc, PyMem_Realloc, PyMem_Free
cdef struct Node:
double value
double bias
double *weights
cdef struct Layer:
Node *nodes
int number_of_nodes
cdef Layer *layers
cdef int number_of_layers
def init(int n0,int n1,int n2):
global layers,number_of_layers
cdef int i,j,k,l,m,n
number_of_layers = 3
layers = <Layer*> PyMem_Malloc(number_of_layers * sizeof(Layer))
ns = [n0,n1,n2]
for i in range(number_of_layers):
layers[i].nodes = <Node*> PyMem_Malloc(ns[i] * sizeof(Node))
layers[i].number_of_nodes = ns[i]
for i in range(number_of_layers):
for j in range(ns[i]):
layers[i].nodes[j].value = 0.0
layers[i].nodes[j].bias = 0.0
if i != number_of_layers-1: #has child layer
n = layers[i+1].number_of_nodes
layers[i].nodes[j].weights = <double*> PyMem_Malloc(n * sizeof(double))
for k in range(n):
layers[i].nodes[j].weights[k] = 0.0
def set_adjusted_data(fp):
f = open(fp,'r')
while True:
line = f.readline()
if not line:
break
info = line.split()
mode = info[0]
layer_idx = int(info[2])
data_n = int(info[4])
if mode == 'weight':
for i in range(data_n):
line = f.readline().strip()
data = line.split(':')
weights = data[1].split(',')
for j in range(len(weights)):
layers[layer_idx-1].nodes[j].weights[i] = float(weights[j])
elif mode == 'bias':
for i in range(data_n):
line = f.readline().strip()
data = line.split(':')
bias = float(data[1])
layers[layer_idx].nodes[i].bias = bias
def set_input(array3d,h,w,c): # Height×Width×Channel
cdef int i,j,k,l,m,n
for i,row in enumerate(array3d):
for j,col in enumerate(row):
for k,value in enumerate(col):
n = k*(c**2) + i*h + j
layers[0].nodes[n].value = value
def feed_forward():
cdef int i,j,k
cdef double value,max_value,pv,pw
for i in range(1,number_of_layers): #not input layer
for j in range(layers[i].number_of_nodes):
value = 0.0
for k in range(layers[i-1].number_of_nodes):
pv = layers[i-1].nodes[k].value
pw = layers[i-1].nodes[k].weights[j]
value += pv * pw
value += layers[i].nodes[j].bias
#activation function
# sigmoid
if value > 709: value = 709
elif value < -709: value = -709
layers[i].nodes[j].value = 1.0/(1.0+exp(-value))
def get_output(no):
return layers[number_of_layers-1].nodes[no].value
def get_max_output_no():
n = number_of_layers-1
max_value = layers[n].nodes[0].value
max_n = 0
for i in range(1,layers[n].number_of_nodes):
v = layers[n].nodes[i].value
if max_value < v:
max_value = v
max_no = i
return max_no
[test.py]
# -*- coding: utf-8 -*-
import pickle
import random
import time
import fixed_nn as nn
def main():
nn.init(512,1000,10)
nn.set_adjusted_data('nn_adjusted_data.txt')
f = open('mnist_testing.dat','rb')
datasets = pickle.load(f)
f.close()
correct = 0.0
check_n = 100
start_time = time.time()
for i in range(check_n):
dataset = datasets[random.randint(0,9999)]
data = dataset['data']
nn.set_input(data,8,8,8)
nn.feed_forward()
result = nn.get_max_output_no()
if dataset['label'] == result:
correct += 1
print('N={}, Accuracy:={}'.format(check_n,correct/check_n))
print('Time {0}sec'.format(time.time()-start_time))
if __name__ == '__main__':
main()
Cythonのコードは、バックプロパゲーションによる重み調整も誤差計算もありません。シンプル。活性化関数はシグモイドしか書いてませんが、他の関数でも学習がないなら実装は簡単です。ファイル容量は70KB程度。CNNやらRNNならいざ知らず、単純なアルゴリズムなら、PyTorchで色々トライして、自前で実装という方法は悪くない気がしました。
一応、コンパイルしたpydも公開します。ほしい人はどうぞ。
Python2.7の32bit用と、Python3.6の64bitの2つをコンパイルしました。
ダウンロード先(OneDrive)
https://1drv.ms/u/s!Ak5J3V_CLLtbnQecCkxH8Xu-sRz7?e=FW4PIb
さて、元々の目的は、手書き数字の認識ライブラリまでもっていくことなので、画像を読み込んで、数字を返すところまでの実装です。細線化まではOpenCVで行うとして、ベクタ化処理をCythonで実装したいなぁ。
