亚洲激情专区-91九色丨porny丨老师-久久久久久久女国产乱让韩-国产精品午夜小视频观看

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

HDFS中怎么實現本地文件上傳

發布時間:2021-06-26 14:29:37 來源:億速云 閱讀:775 作者:Leah 欄目:云計算

本篇文章給大家分享的是有關HDFS中怎么實現本地文件上傳,小編覺得挺實用的,因此分享給大家學習,希望大家閱讀完這篇文章后可以有所收獲,話不多說,跟著小編一起來看看吧。

public synchronized void write(byte b[], int off, int len)

        throws IOException {

            if (closed) { //校驗是否關閉了,關閉了自然不應該再寫入數據了

                throw new IOException("Stream closed");

            }

            while (len > 0) { //這里的len就是指源緩沖區剩下的未寫完的數據長度,單位byte

              int remaining = BUFFER_SIZE - pos; //目的緩沖區里可以寫的字節數

              int toWrite = Math.min(remaining, len); //跟需要寫的字節數比較,取較小值作為真正要寫入的字節數

              System.arraycopy(b, off, outBuf, pos, toWrite); //開始復制來作為寫入到目的緩沖區操作

              pos += toWrite; //更新目的緩沖區位置指針

              off += toWrite; //更新源緩沖區位置指針

              len -= toWrite; //更新源緩沖區剩下的內容長度

              filePos += toWrite; //計算整個文件的總的已經寫入的長度(包括緩沖區里的內容)

              if ((bytesWrittenToBlock + pos >= BLOCK_SIZE) ||

                  (pos == BUFFER_SIZE)) {

                flush(); //這里是2個條件引起flush,一個是總長度(已寫+緩存)超過一個塊大小,

                                              //第2個就是目的緩沖區已經滿了,都么空間寫入了,自然需要flush了。

              }

            }

        }

 //友情提醒,這里的前半段寫入是能寫多少寫多少,寫完了再判斷!

為啥有2個判斷條件?想必很多人對緩沖區滿了很好理解,因為都沒剩余空間了

而對bytesWrittenToBlock + pos >= BLOCK_SIZE可能不是很清楚

這是因為一個Block寫滿了就要另起爐灶,重新開一個Block.

flush()函數暫時不解釋,后面再解釋!

---

 public synchronized void write(int b) throws IOException {

            if (closed) {//仍然是校驗是否關閉

                throw new IOException("Stream closed");

            }

            if ((bytesWrittenToBlock + pos == BLOCK_SIZE) ||

                (pos >= BUFFER_SIZE)) {

                flush();

            }//仍然是2個條件的校驗

            outBuf[pos++] = (byte) b;

            filePos++;//這2句的意義在于真正的寫入到目的緩沖區里

                         不過為啥不把這2段調一下順序更好理解?果然思維獨特!

        }

---

 public synchronized void flush() throws IOException {

            if (closed) {

                throw new IOException("Stream closed");

            }//檢驗是否關閉,老規矩

            if (bytesWrittenToBlock + pos >= BLOCK_SIZE) {

                flushData(BLOCK_SIZE - bytesWrittenToBlock);

            }//如果需要新起1個Block的話,就把剩下的不足字節數先寫上

            if (bytesWrittenToBlock == BLOCK_SIZE) {

                endBlock();//然后關閉當前塊,新起一塊

            }

            flushData(pos);//對當前塊繼續寫剩下的

        } 

---

繼續看別的函數

在看別的函數之前,首先希望讀者先建立一個0.1.0中文件的存儲機制。

在讀取本地文件上傳到HDFS中,文件流是這樣的。

本地文件--->本地內存緩沖區Buffer--->本地文件--->上傳到遠程HDFS系統。

而本地內存緩沖區Buffer--->本地文件就是flushData做的事情,請再復習下flush函數,然后再接下來分析flushData.

PS:看代碼比寫代碼累,看代碼是了解別人的思維,寫代碼是把自己的思維實現起來。。。 

private synchronized void flushData(int maxPos) throws IOException {

            int workingPos = Math.min(pos, maxPos);//計算要寫入的字節數,真是多此一舉。

            if (workingPos > 0) {//如果確實需要寫的話

                //

                // To the local block backup, write just the bytes

                //

                backupStream.write(outBuf, 0, workingPos);//寫入到本地文件

                //注意,請認真閱讀backupStream的初始化過程,是一個本地文件。

                //也就是說計劃把內存緩沖區里的內容寫到本地文件中,寫完一個block再發送給HDFS.

                //聰明的讀者應該想到最后一個block的大小是<=blockSize的。

                // Track position

                //

                bytesWrittenToBlock += workingPos;//更新寫入到block塊的字節數,

                //尤其要強調,當一個塊結束后,這個變量就會重置為0,你懂的。

                System.arraycopy(outBuf, workingPos, outBuf, 0, pos - workingPos);

                //字節前挪移到偏移量為0的位置,方便后面IO操作,你懂得,不解釋。

                pos -= workingPos;//相關變量都需要更新

            }

        }

---------------

接下來到了比較核心的函數endBlock(); 意思是關閉當前塊,新起一塊,下面來看看具體的代碼!

private synchronized void endBlock() throws IOException {

            //

            // Done with local copy

            //

            backupStream.close();//關閉本地文件系統的臨時文件

            //

            // Send it to datanode//準備發送給datanode了。

            //

            boolean mustRecover = true;//定義一個哨兵變量

            while (mustRecover) {//需要讀取當前文件時

                nextBlockOutputStream();

           因為這個函數到后面才分析,所以提把背景知識補充好,這個函數

           主要是初始化了一對IO流句柄,這個流是當前shell和遠程datanode

           之間的TCP連接,這對IO流句柄就是 blockStream + blockReplyStream,

           分別對應著輸出流和輸入流,輸出流用來輸出文件頭和文件內容,輸入流是

           用來讀取響應。 

                InputStream in = new FileInputStream(backupFile);//既然第一行關閉了寫,

                現在就可以開始讀了

                try {

                    byte buf[] = new byte[BUFFER_SIZE];//還是局部的IO緩沖區

                    int bytesRead = in.read(buf);//從本地文件中讀取內容

                    while (bytesRead > 0) {//大于0?

                        blockStream.writeLong((long) bytesRead);//寫入字節數

                        blockStream.write(buf, 0, bytesRead);//寫入緩沖區的內容

                        bytesRead = in.read(buf);//繼續從本地文件中讀取

                    }

                    internalClose();//跟NameNode和DataNode的交互,表示關閉

                    mustRecover = false;//表示任務結束

                } catch (IOException ie) {

                    handleSocketException(ie);

                } finally {

                  in.close();//關閉當前文件的輸入流

                }

            }

            //

            // Delete local backup, start new one

            //下面4行是從新建立起本地文件系統的文件緩沖系統,不解釋

            backupFile.delete();

            backupFile = newBackupFile();

            backupStream = new FileOutputStream(backupFile);

            bytesWrittenToBlock = 0;

        }

在閱讀以上代碼之后,我個人認為如果用C語言來寫這段邏輯的話,我會直接調用sendfile來實現文件傳輸。

當然JAVA的API滯后性以及OS當時或許都不提供這種方式吧,反正現在的內核都提供了。

---------------------------------------

 那么接下來分析的是函數:nextBlockOutputStream()

private synchronized void nextBlockOutputStream() throws IOException {

            boolean retry = false;//不解釋

            long start = System.currentTimeMillis();//當前開始時間

            do {

                retry = false;//重置為false 

                long localstart = System.currentTimeMillis();//當前開始時間

                boolean blockComplete = false;//標注塊是否OK

                LocatedBlock lb = null;    //初始化為null          

                while (! blockComplete) {//如果未結束

                    if (firstTime) {//如果是第一次開啟一個文件

                        lb = namenode.create(src.toString(), clientName.toString(), localName, overwrite);//創建一個文件 

                    } else {

                        lb = namenode.addBlock(src.toString(), localName);

                    }//增加一個block

                    if (lb == null) {//如果找不到

                        try {

                            Thread.sleep(400);//就沉睡400毫秒

                            if (System.currentTimeMillis() - localstart > 5000) {

                                LOG.info("Waiting to find new output block node for " + (System.currentTimeMillis() - start) + "ms");

                            }

                        } catch (InterruptedException ie) {

                        }

                    } else {

                        blockComplete = true;//設置blockComplete為true.解釋為找到了一個block

                    }

                }

                block = lb.getBlock();//從lb中獲取block的信息

                DatanodeInfo nodes[] = lb.getLocations();//從lb中獲取block要存儲的DataNode數組

                //

                // Connect to first DataNode in the list.  Abort if this fails.

                //請注意上面這句的意思:連接第一個數據節點,

                //為啥?數據傳輸采用計算機組成原理的菊花鏈模式

                InetSocketAddress target = DataNode.createSocketAddr(nodes[0].getName().toString());//解析

                try {

                    s = new Socket();

                    s.connect(target, READ_TIMEOUT);//連接第一個DataNode

                    s.setSoTimeout(READ_TIMEOUT);//設置讀取時間

                } catch (IOException ie) {//異常這里就不分析了

                    // Connection failed.  Let's wait a little bit and retry

                    try {

                        if (System.currentTimeMillis() - start > 5000) {

                            LOG.info("Waiting to find target node: " + target);

                        }

                        Thread.sleep(6000);

                    } catch (InterruptedException iex) {

                    }

                    if (firstTime) {

                        namenode.abandonFileInProgress(src.toString());

                    } else {

                        namenode.abandonBlock(block, src.toString());

                    }

                    retry = true;

                    continue;

                }

                //此時已經成功連接到了遠程DataNode節點,bingo!

                // Xmit header info to datanode

                //

                DataOutputStream out = new DataOutputStream(new BufferedOutputStream(s.getOutputStream()));

//獲取輸出流句柄

                out.write(OP_WRITE_BLOCK);//輸出行為標識

                out.writeBoolean(false);//false?

                block.write(out);//寫入block信息,注意:是把從namenode獲取到的block寫給DataNode

                out.writeInt(nodes.length);//這一樣和下面這一行是為了寫入所有存儲及備份的DataNode

                for (int i = 0; i < nodes.length; i++) {

                    nodes[i].write(out);//不解釋

                }

                out.write(CHUNKED_ENCODING);//寫CHUNKED_ENCODING

                bytesWrittenToBlock = 0;//重置為0

                blockStream = out;//把句柄賦值給類的局部變量供后續使用

                blockReplyStream = new DataInputStream(new BufferedInputStream(s.getInputStream()));//同理,不解釋

            } while (retry);

            firstTime = false;//firstTime在至少有一個塊信息返回后就為false

=================================================== 

接下來要分析的函數是

private synchronized void internalClose() throws IOException {

            blockStream.writeLong(0);//表明長度結束了

            blockStream.flush();//把緩沖內容全部輸出。

            long complete = blockReplyStream.readLong();//讀取響應

            if (complete != WRITE_COMPLETE) {//如果不是結束

                LOG.info("Did not receive WRITE_COMPLETE flag: " + complete);

                throw new IOException("Did not receive WRITE_COMPLETE_FLAG: " + complete);

            }

            LocatedBlock lb = new LocatedBlock();//創建一個新對象

            lb.readFields(blockReplyStream);//根據響應流來賦值

            namenode.reportWrittenBlock(lb);//向namenode報告寫入成功

            s.close();//關閉此流

            s = null;

        }

================

最后就是close函數

public synchronized void close() throws IOException {

            if (closed) {

                throw new IOException("Stream closed");

            }//校驗是否關閉了

            flush();//盡可能的輸出內容

            if (filePos == 0 || bytesWrittenToBlock != 0) {

              try {

                endBlock();//結束一個塊

              } catch (IOException e) {

                namenode.abandonFileInProgress(src.toString());//拋棄此file

                throw e;

              }

            }

            backupStream.close();//關閉流

            backupFile.delete();//刪除文件

            if (s != null) {

                s.close();//不解釋

                s = null;

            }

            super.close();

            long localstart = System.currentTimeMillis();

            boolean fileComplete = false;

            while (! fileComplete) {//循環報告文件寫完了

                fileComplete = namenode.complete(src.toString(), clientName.toString());

                if (!fileComplete) {

                    try {

                        Thread.sleep(400);

                        if (System.currentTimeMillis() - localstart > 5000) {

                            LOG.info("Could not complete file, retrying...");

                        }

                    } catch (InterruptedException ie) {

                    }

                }

            }

            closed = true;

        }

以上就是HDFS中怎么實現本地文件上傳,小編相信有部分知識點可能是我們日常工作會見到或用到的。希望你能通過這篇文章學到更多知識。更多詳情敬請關注億速云行業資訊頻道。

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

泰安市| 额济纳旗| 张家口市| 夏邑县| 长垣县| 冀州市| 白玉县| 绥宁县| 偏关县| 太白县| 临颍县| 宕昌县| 越西县| 望奎县| 博爱县| 青田县| 日喀则市| 黄梅县| 天等县| 和静县| 永安市| 石台县| 北辰区| 奇台县| 沙田区| 安宁市| 株洲县| 镇江市| 通城县| 太仆寺旗| 康马县| 阳新县| 红安县| 诏安县| 酉阳| 新田县| 横峰县| 福贡县| 汤原县| 保德县| 莱州市|