您好,登錄后才能下訂單哦!
這篇文章給大家分享的是有關Tensorflow中多線程與多進程數據加載的示例分析的內容。小編覺得挺實用的,因此分享給大家做個參考,一起跟隨小編過來看看吧。
1. 多線程數據讀取
第一種方法是可以直接從csv里讀取數據,但返回值是tensor,需要在sess里run一下才能返回真實值,無法實現真正的并行處理,但如果直接用csv文件或其他什么文件存了特征值,可以直接讀取后進行訓練,可使用這種方法.
import tensorflow as tf #這里是返回的數據類型,具體內容無所謂,類型對應就好了,比如我這個,就是一個四維的向量,前三維是字符串類型 最后一維是int類型 record_defaults = [[""], [""], [""], [0]] def decode_csv(line): parsed_line = tf.decode_csv(line, record_defaults) label = parsed_line[-1] # label del parsed_line[-1] # delete the last element from the list features = tf.stack(parsed_line) # Stack features so that you can later vectorize forward prop., etc. #label = tf.stack(label) #NOT needed. Only if more than 1 column makes the label... batch_to_return = features, label return batch_to_return filenames = tf.placeholder(tf.string, shape=[None]) dataset5 = tf.data.Dataset.from_tensor_slices(filenames) #在這里設置線程數目 dataset5 = dataset5.flat_map(lambda filename: tf.data.TextLineDataset(filename).skip(1).map(decode_csv,num_parallel_calls=15)) dataset5 = dataset5.shuffle(buffer_size=1000) dataset5 = dataset5.batch(32) #batch_size iterator5 = dataset5.make_initializable_iterator() next_element5 = iterator5.get_next() #這里是需要加載的文件名 training_filenames = ["train.csv"] validation_filenames = ["vali.csv"] with tf.Session() as sess: for _ in range(2): #通過文件名初始化迭代器 sess.run(iterator5.initializer, feed_dict={filenames: training_filenames}) while True: try: #這里獲得真實值 features, labels = sess.run(next_element5) # Train... # print("(train) features: ") # print(features) # print("(train) labels: ") # print(labels) except tf.errors.OutOfRangeError: print("Out of range error triggered (looped through training set 1 time)") break # Validate (cost, accuracy) on train set print("\nDone with the first iterator\n") sess.run(iterator5.initializer, feed_dict={filenames: validation_filenames}) while True: try: features, labels = sess.run(next_element5) # Validate (cost, accuracy) on dev set # print("(dev) features: ") # print(features) # print("(dev) labels: ") # print(labels) except tf.errors.OutOfRangeError: print("Out of range error triggered (looped through dev set 1 time only)") break
第二種方法,基于生成器,可以進行預處理操作了,sess里run出來的結果可以直接進行輸入訓練,但需要自己寫一個生成器,我使用的測試代碼如下:
import tensorflow as tf import random import threading import numpy as np from data import load_image,load_wave class SequenceData(): def __init__(self, path, batch_size=32): self.path = path self.batch_size = batch_size f = open(path) self.datas = f.readlines() self.L = len(self.datas) self.index = random.sample(range(self.L), self.L) def __len__(self): return self.L - self.batch_size def __getitem__(self, idx): batch_indexs = self.index[idx:(idx+self.batch_size)] batch_datas = [self.datas[k] for k in batch_indexs] img1s,img2s,audios,labels = self.data_generation(batch_datas) return img1s,img2s,audios,labels def gen(self): for i in range(100000): t = self.__getitem__(i) yield t def data_generation(self, batch_datas): #預處理操作,數據在參數里 return img1s,img2s,audios,labels #這里的type要和實際返回的數據類型對應,如果在自己的處理代碼里已經考慮的batchszie,那這里的batch設為1即可 dataset = tf.data.Dataset().batch(1).from_generator(SequenceData('train.csv').gen, output_types= (tf.float32,tf.float32,tf.float32,tf.int64)) dataset = dataset.map(lambda x,y,z,w : (x,y,z,w), num_parallel_calls=32).prefetch(buffer_size=1000) X, y,z,w = dataset.make_one_shot_iterator().get_next() with tf.Session() as sess: for _ in range(100000): a,b,c,d = sess.run([X,y,z,w]) print(a.shape)
不過python的多線程并不是真正的多線程,雖然看起來我是啟動了32線程,但運行時的CPU占用如下所示:
還剩這么多核心空著,然后就是第三個版本了,使用了queue來緩存數據,訓練需要數據時直接從queue中進行讀取,是一個到多進程的過度版本(vscode沒法debug多進程,坑啊,還以為代碼寫錯了,在vscode里多進程直接就沒法運行),在初始化時啟動多個線程進行數據的預處理:
import tensorflow as tf import random import threading import numpy as np from data import load_image,load_wave from queue import Queue class SequenceData(): def __init__(self, path, batch_size=32): self.path = path self.batch_size = batch_size f = open(path) self.datas = f.readlines() self.L = len(self.datas) self.index = random.sample(range(self.L), self.L) self.queue = Queue(maxsize=20) for i in range(32): threading.Thread(target=self.f).start() def __len__(self): return self.L - self.batch_size def __getitem__(self, idx): batch_indexs = self.index[idx:(idx+self.batch_size)] batch_datas = [self.datas[k] for k in batch_indexs] img1s,img2s,audios,labels = self.data_generation(batch_datas) return img1s,img2s,audios,labels def f(self): for i in range(int(self.__len__()/self.batch_size)): t = self.__getitem__(i) self.queue.put(t) def gen(self): while 1: yield self.queue.get() def data_generation(self, batch_datas): #數據預處理操作 return img1s,img2s,audios,labels #這里的type要和實際返回的數據類型對應,如果在自己的處理代碼里已經考慮的batchszie,那這里的batch設為1即可 dataset = tf.data.Dataset().batch(1).from_generator(SequenceData('train.csv').gen, output_types= (tf.float32,tf.float32,tf.float32,tf.int64)) dataset = dataset.map(lambda x,y,z,w : (x,y,z,w), num_parallel_calls=1).prefetch(buffer_size=1000) X, y,z,w = dataset.make_one_shot_iterator().get_next() with tf.Session() as sess: for _ in range(100000): a,b,c,d = sess.run([X,y,z,w]) print(a.shape)
2. 多進程數據讀取
這里的代碼和多線程的第三個版本非常類似,修改為啟動進程和進程類里的Queue即可,但千萬不要在vscode里直接debug!在vscode里直接f5運行進程并不能啟動.
from __future__ import unicode_literals from functools import reduce import tensorflow as tf import numpy as np import warnings import argparse import skimage.io import skimage.transform import skimage import scipy.io.wavfile from multiprocessing import Process,Queue class SequenceData(): def __init__(self, path, batch_size=32): self.path = path self.batch_size = batch_size f = open(path) self.datas = f.readlines() self.L = len(self.datas) self.index = random.sample(range(self.L), self.L) self.queue = Queue(maxsize=30) self.Process_num=32 for i in range(self.Process_num): print(i,'start') ii = int(self.__len__()/self.Process_num) t = Process(target=self.f,args=(i*ii,(i+1)*ii)) t.start() def __len__(self): return self.L - self.batch_size def __getitem__(self, idx): batch_indexs = self.index[idx:(idx+self.batch_size)] batch_datas = [self.datas[k] for k in batch_indexs] img1s,img2s,audios,labels = self.data_generation(batch_datas) return img1s,img2s,audios,labels def f(self,i_l,i_h): for i in range(i_l,i_h): t = self.__getitem__(i) self.queue.put(t) def gen(self): while 1: t = self.queue.get() yield t[0],t[1],t[2],t[3] def data_generation(self, batch_datas): #數據預處理操作 return img1s,img2s,audios,labels epochs = 2 data_g = SequenceData('train_1.csv',batch_size=48) dataset = tf.data.Dataset().batch(1).from_generator(data_g.gen, output_types= (tf.float32,tf.float32,tf.float32,tf.float32)) X, y,z,w = dataset.make_one_shot_iterator().get_next() with tf.Session() as sess: tf.global_variables_initializer().run() for i in range(epochs): for j in range(int(len(data_g)/(data_g.batch_size))): face1,face2,voice, labels = sess.run([X,y,z,w]) print(face1.shape)
然后,最后實現的效果
感謝各位的閱讀!關于“Tensorflow中多線程與多進程數據加載的示例分析”這篇文章就分享到這里了,希望以上內容可以對大家有一定的幫助,讓大家可以學到更多知識,如果覺得文章不錯,可以把它分享出去讓更多的人看到吧!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。