您好,登錄后才能下訂單哦!
本篇文章給大家分享的是有關HDFS中怎么實現本地文件上傳,小編覺得挺實用的,因此分享給大家學習,希望大家閱讀完這篇文章后可以有所收獲,話不多說,跟著小編一起來看看吧。
public synchronized void write(byte b[], int off, int len)
throws IOException {
if (closed) { //校驗是否關閉了,關閉了自然不應該再寫入數據了
throw new IOException("Stream closed");
}
while (len > 0) { //這里的len就是指源緩沖區剩下的未寫完的數據長度,單位byte
int remaining = BUFFER_SIZE - pos; //目的緩沖區里可以寫的字節數
int toWrite = Math.min(remaining, len); //跟需要寫的字節數比較,取較小值作為真正要寫入的字節數
System.arraycopy(b, off, outBuf, pos, toWrite); //開始復制來作為寫入到目的緩沖區操作
pos += toWrite; //更新目的緩沖區位置指針
off += toWrite; //更新源緩沖區位置指針
len -= toWrite; //更新源緩沖區剩下的內容長度
filePos += toWrite; //計算整個文件的總的已經寫入的長度(包括緩沖區里的內容)
if ((bytesWrittenToBlock + pos >= BLOCK_SIZE) ||
(pos == BUFFER_SIZE)) {
flush(); //這里是2個條件引起flush,一個是總長度(已寫+緩存)超過一個塊大小,
//第2個就是目的緩沖區已經滿了,都么空間寫入了,自然需要flush了。
}
}
}
//友情提醒,這里的前半段寫入是能寫多少寫多少,寫完了再判斷!
為啥有2個判斷條件?想必很多人對緩沖區滿了很好理解,因為都沒剩余空間了
而對bytesWrittenToBlock + pos >= BLOCK_SIZE可能不是很清楚
這是因為一個Block寫滿了就要另起爐灶,重新開一個Block.
flush()函數暫時不解釋,后面再解釋!
---
public synchronized void write(int b) throws IOException {
if (closed) {//仍然是校驗是否關閉
throw new IOException("Stream closed");
}
if ((bytesWrittenToBlock + pos == BLOCK_SIZE) ||
(pos >= BUFFER_SIZE)) {
flush();
}//仍然是2個條件的校驗
outBuf[pos++] = (byte) b;
filePos++;//這2句的意義在于真正的寫入到目的緩沖區里
不過為啥不把這2段調一下順序更好理解?果然思維獨特!
}
---
public synchronized void flush() throws IOException {
if (closed) {
throw new IOException("Stream closed");
}//檢驗是否關閉,老規矩
if (bytesWrittenToBlock + pos >= BLOCK_SIZE) {
flushData(BLOCK_SIZE - bytesWrittenToBlock);
}//如果需要新起1個Block的話,就把剩下的不足字節數先寫上
if (bytesWrittenToBlock == BLOCK_SIZE) {
endBlock();//然后關閉當前塊,新起一塊
}
flushData(pos);//對當前塊繼續寫剩下的
}
---
繼續看別的函數
在看別的函數之前,首先希望讀者先建立一個0.1.0中文件的存儲機制。
在讀取本地文件上傳到HDFS中,文件流是這樣的。
本地文件--->本地內存緩沖區Buffer--->本地文件--->上傳到遠程HDFS系統。
而本地內存緩沖區Buffer--->本地文件就是flushData做的事情,請再復習下flush函數,然后再接下來分析flushData.
PS:看代碼比寫代碼累,看代碼是了解別人的思維,寫代碼是把自己的思維實現起來。。。
private synchronized void flushData(int maxPos) throws IOException {
int workingPos = Math.min(pos, maxPos);//計算要寫入的字節數,真是多此一舉。
if (workingPos > 0) {//如果確實需要寫的話
//
// To the local block backup, write just the bytes
//
backupStream.write(outBuf, 0, workingPos);//寫入到本地文件
//注意,請認真閱讀backupStream的初始化過程,是一個本地文件。
//也就是說計劃把內存緩沖區里的內容寫到本地文件中,寫完一個block再發送給HDFS.
//聰明的讀者應該想到最后一個block的大小是<=blockSize的。
// Track position
//
bytesWrittenToBlock += workingPos;//更新寫入到block塊的字節數,
//尤其要強調,當一個塊結束后,這個變量就會重置為0,你懂的。
System.arraycopy(outBuf, workingPos, outBuf, 0, pos - workingPos);
//字節前挪移到偏移量為0的位置,方便后面IO操作,你懂得,不解釋。
pos -= workingPos;//相關變量都需要更新
}
}
---------------
接下來到了比較核心的函數endBlock(); 意思是關閉當前塊,新起一塊,下面來看看具體的代碼!
private synchronized void endBlock() throws IOException {
//
// Done with local copy
//
backupStream.close();//關閉本地文件系統的臨時文件
//
// Send it to datanode//準備發送給datanode了。
//
boolean mustRecover = true;//定義一個哨兵變量
while (mustRecover) {//需要讀取當前文件時
nextBlockOutputStream();
因為這個函數到后面才分析,所以提把背景知識補充好,這個函數
主要是初始化了一對IO流句柄,這個流是當前shell和遠程datanode
之間的TCP連接,這對IO流句柄就是 blockStream + blockReplyStream,
分別對應著輸出流和輸入流,輸出流用來輸出文件頭和文件內容,輸入流是
用來讀取響應。
InputStream in = new FileInputStream(backupFile);//既然第一行關閉了寫,
現在就可以開始讀了
try {
byte buf[] = new byte[BUFFER_SIZE];//還是局部的IO緩沖區
int bytesRead = in.read(buf);//從本地文件中讀取內容
while (bytesRead > 0) {//大于0?
blockStream.writeLong((long) bytesRead);//寫入字節數
blockStream.write(buf, 0, bytesRead);//寫入緩沖區的內容
bytesRead = in.read(buf);//繼續從本地文件中讀取
}
internalClose();//跟NameNode和DataNode的交互,表示關閉
mustRecover = false;//表示任務結束
} catch (IOException ie) {
handleSocketException(ie);
} finally {
in.close();//關閉當前文件的輸入流
}
}
//
// Delete local backup, start new one
//下面4行是從新建立起本地文件系統的文件緩沖系統,不解釋
backupFile.delete();
backupFile = newBackupFile();
backupStream = new FileOutputStream(backupFile);
bytesWrittenToBlock = 0;
}
在閱讀以上代碼之后,我個人認為如果用C語言來寫這段邏輯的話,我會直接調用sendfile來實現文件傳輸。
當然JAVA的API滯后性以及OS當時或許都不提供這種方式吧,反正現在的內核都提供了。
---------------------------------------
那么接下來分析的是函數:nextBlockOutputStream()
private synchronized void nextBlockOutputStream() throws IOException {
boolean retry = false;//不解釋
long start = System.currentTimeMillis();//當前開始時間
do {
retry = false;//重置為false
long localstart = System.currentTimeMillis();//當前開始時間
boolean blockComplete = false;//標注塊是否OK
LocatedBlock lb = null; //初始化為null
while (! blockComplete) {//如果未結束
if (firstTime) {//如果是第一次開啟一個文件
lb = namenode.create(src.toString(), clientName.toString(), localName, overwrite);//創建一個文件
} else {
lb = namenode.addBlock(src.toString(), localName);
}//增加一個block
if (lb == null) {//如果找不到
try {
Thread.sleep(400);//就沉睡400毫秒
if (System.currentTimeMillis() - localstart > 5000) {
LOG.info("Waiting to find new output block node for " + (System.currentTimeMillis() - start) + "ms");
}
} catch (InterruptedException ie) {
}
} else {
blockComplete = true;//設置blockComplete為true.解釋為找到了一個block
}
}
block = lb.getBlock();//從lb中獲取block的信息
DatanodeInfo nodes[] = lb.getLocations();//從lb中獲取block要存儲的DataNode數組
//
// Connect to first DataNode in the list. Abort if this fails.
//請注意上面這句的意思:連接第一個數據節點,
//為啥?數據傳輸采用計算機組成原理的菊花鏈模式
InetSocketAddress target = DataNode.createSocketAddr(nodes[0].getName().toString());//解析
try {
s = new Socket();
s.connect(target, READ_TIMEOUT);//連接第一個DataNode
s.setSoTimeout(READ_TIMEOUT);//設置讀取時間
} catch (IOException ie) {//異常這里就不分析了
// Connection failed. Let's wait a little bit and retry
try {
if (System.currentTimeMillis() - start > 5000) {
LOG.info("Waiting to find target node: " + target);
}
Thread.sleep(6000);
} catch (InterruptedException iex) {
}
if (firstTime) {
namenode.abandonFileInProgress(src.toString());
} else {
namenode.abandonBlock(block, src.toString());
}
retry = true;
continue;
}
//此時已經成功連接到了遠程DataNode節點,bingo!
// Xmit header info to datanode
//
DataOutputStream out = new DataOutputStream(new BufferedOutputStream(s.getOutputStream()));
//獲取輸出流句柄
out.write(OP_WRITE_BLOCK);//輸出行為標識
out.writeBoolean(false);//false?
block.write(out);//寫入block信息,注意:是把從namenode獲取到的block寫給DataNode
out.writeInt(nodes.length);//這一樣和下面這一行是為了寫入所有存儲及備份的DataNode
for (int i = 0; i < nodes.length; i++) {
nodes[i].write(out);//不解釋
}
out.write(CHUNKED_ENCODING);//寫CHUNKED_ENCODING
bytesWrittenToBlock = 0;//重置為0
blockStream = out;//把句柄賦值給類的局部變量供后續使用
blockReplyStream = new DataInputStream(new BufferedInputStream(s.getInputStream()));//同理,不解釋
} while (retry);
firstTime = false;//firstTime在至少有一個塊信息返回后就為false
===================================================
接下來要分析的函數是
private synchronized void internalClose() throws IOException {
blockStream.writeLong(0);//表明長度結束了
blockStream.flush();//把緩沖內容全部輸出。
long complete = blockReplyStream.readLong();//讀取響應
if (complete != WRITE_COMPLETE) {//如果不是結束
LOG.info("Did not receive WRITE_COMPLETE flag: " + complete);
throw new IOException("Did not receive WRITE_COMPLETE_FLAG: " + complete);
}
LocatedBlock lb = new LocatedBlock();//創建一個新對象
lb.readFields(blockReplyStream);//根據響應流來賦值
namenode.reportWrittenBlock(lb);//向namenode報告寫入成功
s.close();//關閉此流
s = null;
}
================
最后就是close函數
public synchronized void close() throws IOException {
if (closed) {
throw new IOException("Stream closed");
}//校驗是否關閉了
flush();//盡可能的輸出內容
if (filePos == 0 || bytesWrittenToBlock != 0) {
try {
endBlock();//結束一個塊
} catch (IOException e) {
namenode.abandonFileInProgress(src.toString());//拋棄此file
throw e;
}
}
backupStream.close();//關閉流
backupFile.delete();//刪除文件
if (s != null) {
s.close();//不解釋
s = null;
}
super.close();
long localstart = System.currentTimeMillis();
boolean fileComplete = false;
while (! fileComplete) {//循環報告文件寫完了
fileComplete = namenode.complete(src.toString(), clientName.toString());
if (!fileComplete) {
try {
Thread.sleep(400);
if (System.currentTimeMillis() - localstart > 5000) {
LOG.info("Could not complete file, retrying...");
}
} catch (InterruptedException ie) {
}
}
}
closed = true;
}
以上就是HDFS中怎么實現本地文件上傳,小編相信有部分知識點可能是我們日常工作會見到或用到的。希望你能通過這篇文章學到更多知識。更多詳情敬請關注億速云行業資訊頻道。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。