亚洲激情专区-91九色丨porny丨老师-久久久久久久女国产乱让韩-国产精品午夜小视频观看

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

怎么使用Java?API操作Hdfs

發布時間:2022-08-24 15:47:31 來源:億速云 閱讀:176 作者:iii 欄目:開發技術

這篇文章主要介紹了怎么使用Java API操作Hdfs的相關知識,內容詳細易懂,操作簡單快捷,具有一定借鑒價值,相信大家閱讀完這篇怎么使用Java API操作Hdfs文章都會有所收獲,下面我們一起來看看吧。

1.遍歷當前目錄下所有文件與文件夾

可以使用listStatus方法實現上述需求。
listStatus方法簽名如下

  /**
   * List the statuses of the files/directories in the given path if the path is
   * a directory.
   * 
   * @param f given path
   * @return the statuses of the files/directories in the given patch
   * @throws FileNotFoundException when the path does not exist;
   *         IOException see specific implementation
   */
  public abstract FileStatus[] listStatus(Path f) throws FileNotFoundException, 
                                                         IOException;

可以看出listStatus只需要傳入參數Path即可,返回的是一個FileStatus的數組。
而FileStatus包含有以下信息

/** Interface that represents the client side information for a file.
 */
@InterfaceAudience.Public
@InterfaceStability.Stable
public class FileStatus implements Writable, Comparable {

  private Path path;
  private long length;
  private boolean isdir;
  private short block_replication;
  private long blocksize;
  private long modification_time;
  private long access_time;
  private FsPermission permission;
  private String owner;
  private String group;
  private Path symlink;
  ....

從FileStatus中不難看出,包含有文件路徑,大小,是否是目錄,block_replication, blocksize…等等各種信息。

import org.apache.hadoop.fs.{FileStatus, FileSystem, Path}
import org.apache.spark.sql.SparkSession
import org.apache.spark.{SparkConf, SparkContext}
import org.slf4j.LoggerFactory

object HdfsOperation {
	
	val logger = LoggerFactory.getLogger(this.getClass)
	
	def tree(sc: SparkContext, path: String) : Unit = {
		val fs = FileSystem.get(sc.hadoopConfiguration)
		val fsPath = new Path(path)
		val status = fs.listStatus(fsPath)
		for(filestatus:FileStatus <- status) {
			logger.error("getPermission is: {}", filestatus.getPermission)
			logger.error("getOwner is: {}", filestatus.getOwner)
			logger.error("getGroup is: {}", filestatus.getGroup)
			logger.error("getLen is: {}", filestatus.getLen)
			logger.error("getModificationTime is: {}", filestatus.getModificationTime)
			logger.error("getReplication is: {}", filestatus.getReplication)
			logger.error("getBlockSize is: {}", filestatus.getBlockSize)
			if (filestatus.isDirectory) {
				val dirpath = filestatus.getPath.toString
				logger.error("文件夾名字為: {}", dirpath)
				tree(sc, dirpath)
			} else {
				val fullname = filestatus.getPath.toString
				val filename = filestatus.getPath.getName
				logger.error("全部文件名為: {}", fullname)
				logger.error("文件名為: {}", filename)
			}
		}
	}
}

如果判斷fileStatus是文件夾,則遞歸調用tree方法,達到全部遍歷的目的。

2.遍歷所有文件

上面的方法是遍歷所有文件以及文件夾。如果只想遍歷文件,可以使用listFiles方法。

	def findFiles(sc: SparkContext, path: String) = {
		val fs = FileSystem.get(sc.hadoopConfiguration)
		val fsPath = new Path(path)
		val files = fs.listFiles(fsPath, true)
		while(files.hasNext) {
			val filestatus = files.next()
			val fullname = filestatus.getPath.toString
			val filename = filestatus.getPath.getName
			logger.error("全部文件名為: {}", fullname)
			logger.error("文件名為: {}", filename)
			logger.error("文件大小為: {}", filestatus.getLen)
		}
	}
  /**
   * List the statuses and block locations of the files in the given path.
   * 
   * If the path is a directory, 
   *   if recursive is false, returns files in the directory;
   *   if recursive is true, return files in the subtree rooted at the path.
   * If the path is a file, return the file's status and block locations.
   * 
   * @param f is the path
   * @param recursive if the subdirectories need to be traversed recursively
   *
   * @return an iterator that traverses statuses of the files
   *
   * @throws FileNotFoundException when the path does not exist;
   *         IOException see specific implementation
   */
  public RemoteIterator<LocatedFileStatus> listFiles(
      final Path f, final boolean recursive)
  throws FileNotFoundException, IOException {
  ...

從源碼可以看出,listFiles 返回一個可迭代的對象RemoteIterator<LocatedFileStatus>,而listStatus返回的是個數組。同時,listFiles返回的都是文件。

3.創建文件夾

	def mkdirToHdfs(sc: SparkContext, path: String) = {
		val fs = FileSystem.get(sc.hadoopConfiguration)
		val result = fs.mkdirs(new Path(path))
		if (result) {
			logger.error("mkdirs already success!")
		} else {
			logger.error("mkdirs had failed!")
		}
	}

4.刪除文件夾

	def deleteOnHdfs(sc: SparkContext, path: String) = {
		val fs = FileSystem.get(sc.hadoopConfiguration)
		val result = fs.delete(new Path(path), true)
		if (result) {
			logger.error("delete already success!")
		} else {
			logger.error("delete had failed!")
		}
	}

5.上傳文件

	def uploadToHdfs(sc: SparkContext, localPath: String, hdfsPath: String): Unit = {
		val fs = FileSystem.get(sc.hadoopConfiguration)
		fs.copyFromLocalFile(new Path(localPath), new Path(hdfsPath))
		fs.close()
	}

6.下載文件

	def downloadFromHdfs(sc: SparkContext, localPath: String, hdfsPath: String) = {
		val fs = FileSystem.get(sc.hadoopConfiguration)
		fs.copyToLocalFile(new Path(hdfsPath), new Path(localPath))
		fs.close()
	}

關于“怎么使用Java API操作Hdfs”這篇文章的內容就介紹到這里,感謝各位的閱讀!相信大家對“怎么使用Java API操作Hdfs”知識都有一定的了解,大家如果還想學習更多知識,歡迎關注億速云行業資訊頻道。

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

成安县| 岱山县| 织金县| 合山市| 灵石县| 米泉市| 万源市| 茂名市| 武汉市| 河北省| 长顺县| 阜宁县| 康乐县| 宿迁市| 湾仔区| 宜黄县| 左权县| 卢氏县| 开江县| 辽源市| 南漳县| 渭南市| 嘉义县| 桐乡市| 玉山县| 奉贤区| 嵊州市| 沧州市| 台南市| 高雄市| 洛川县| 叙永县| 宜丰县| 勃利县| 同心县| 英德市| 疏附县| 临沂市| 安西县| 华容县| 武安市|