您好,登錄后才能下訂單哦!
這段時間看了看工作室的工具庫的下載組件,發現其存在一些問題:
1.下載核心邏輯有 bug,在暫停下載或下載失敗等情況時有概率無法順利完成下載。
2.雖然原來的設計是采用多線程斷點續傳的設計,但打了一下日志發現其實下載任務都是在同一個線程下串行執行,并沒有起到加快下載速度的作用。
考慮到原來的代碼并不復雜,因此對這部分下載組件進行了重寫。這里記錄一下里面的多線程斷點續傳功能的實現。
請查看完整的PDF版
(更多完整項目下載。未完待續。源碼。圖文知識后續上傳github。)
可以點擊關于我聯系我獲取完整PDF
(VX:mm14525201314)
首先我們談一談,多線程下載的意義。
在日常的場景下,網絡中不可能只有下載方與服務器之間這樣一條連接,為了避免在這樣的場景下的網絡擁塞,TCP 協議通過調節窗口的大小來避免出現擁塞,但這個窗口的大小可能沒辦法達到我們預期的效果:充分利用我們的帶寬。因此我們可以采用多個 TCP 連接的形式來提高我們帶寬的利用率,從而加快下載速度。
打個比喻就是我們要從一個水缸中用抽水機通過水管抽水,由于管子的直徑等等的限制,我們單條管子無法完全利用我們的抽水機的抽水動力。因此我們就將這些抽水的任務分成了多份,分攤到多個管子上,這樣就可以更充分的利用我們的抽水機動力,從而提高抽水的速度。
因此,我們使用多線程下載的主要意義就是——提高下載速度。
前面提到了我們主要的目的是將一個總的下載任務分攤到多個子任務中,比如假設我們用 5 個線程下載這個文件,那么我們就可以對一個長度為 N 的任務進行如下圖的均分:
但真實場景下往往 N 都不是剛好為 5 的倍數的,因此對于最后一個任務還需要加上剩余的任務量,也就是 N/5+N%5。
上面的任務分配我們已經了解了,看起來很理想,但有一個問題,我們如何實現向服務器只請求這個文件的某一段而不是全部呢?
我們可以通過在請求頭中加入 Range 字段來指定請求的范圍,從而實現指定某一段的數據。
如:RANGE bytes=10000-19999
就指定了 10000-19999 這段字節的數據
所以我們的核心思想就是通過它拿到文件對應字節段的 InputStream,然后對它讀取并寫入文件。
下面再講講文件寫入問題,由于我們是多線程下載,因此文件并不是每次都是從前往后一個個字節寫入的,隨時可能在文件的任何一個地方寫入數據。因此我們需要能夠在文件的指定位置寫入數據。這里我們用到了RandomAccessFile
來實現這個功能。
RandomAccessFile
是一個隨機訪問文件類,同時整合了 FileOutputStream
和 FileInputStream
,支持從文件的任何字節處讀寫數據。通過它我們就可以在文件的任何字節處寫入數據。
接下來簡單講講我們這里是如何使用 RandomAccessFile
的。我們對于每個子任務來說都有一個開始和結束的位置。每個任務都可以通過 RandomAccessFile::seek
跳轉到文件的對應字節位置,然后從該位置開始讀取 InputStream
并寫入。
這樣,就實現了不同線程對文件的隨機寫入。
由于我們在真正開始下載之前,我們需要先將任務分配到各個線程,因此我們需要先了解到文件的大小。
為了獲取到文件的大小,我們用到 Response Headers
中的 Content-Length
字段。
如下圖所示,可以看到,打開該下載請求的鏈接后,Response Headers
中包含了我們需要的 Content-Length
,也就是該文件的大小,單位是字節。
對于多個子任務,我們如何實現它們的斷點續傳呢?
其實原理很簡單,只需要保證每個子任務的下載進度能夠被即時地記錄即可。這樣繼續下載時只需要讀取這些下載記錄,從上次下載結束的位置開始下載即可。
它的實現有很多方式,只要能做到數據持久化即可。這里我使用的是數據庫來實現。
這樣,我們的子任務需要擁有一些必要的信息
completedSize
:當前下載完成大小taskSize
:子任務總大小startPos
:子任務開始位置currentPos
:子任務進行到的位置endPos
:子任務結束位置通過這些信息,我們就能夠記錄子任務的下載進度從而恢復我們之前的下載,實現斷點續傳。
下面我們用代碼來實現這樣一個多線程下載功能。
首先,我們定義一下下載中的各個狀態:
public class DownloadStatus {
public static final int IDLE = 233; // 空閑,默認狀態
public static final int COMPLETED = 234; // 完成
public static final int DOWNLOADING = 235; // 下載中
public static final int PAUSE = 236; // 暫停
public static final int ERROR = 237; // 出錯
}
可以看到,這里定義了如上的五種狀態。
這里需要用到如數據庫及 HTTP 請求的功能,我們這里定義其接口如下,具體實現各位可以根據需要自己實現:
public interface DownloadDbHelper {
/**
* 從數據庫中刪除子任務記錄
* @param task 子任務記錄
*/
void delete(SubDownloadTask task);
/**
* 向數據庫中插入子任務記錄
* @param task 子任務記錄
*/
void insert(SubDownloadTask task);
/**
* 在數據庫中更新子任務記錄
* @param task 子任務記錄
*/
void update(SubDownloadTask task);
/**
* 獲取所有指定Task下的子任務記錄
* @param taskTag Task的Tag
* @return 子任務記錄
*/
List<SubDownloadTask> queryByTaskTag(String taskTag);
}
public interface DownloadHttpHelper {
/**
* 獲取文件總長度
* @param url 下載url
* @param callback 獲取文件長度CallBack
*/
void getTotalSize(String url, NetCallback<Long> callback);
/**
* 獲取InputStream
* @param url 下載url
* @param start 開始位置
* @param end 結束位置
* @param callback 獲取字節流的CallBack
*/
void getStreamByRange(String url, long start, long end, NetCallback<InputStream> callback);
}
我們先從上到下,從子任務開始實現。在我的設計中,它具有如下的成員變量:
@Entity
public class SubDownloadTask implements Runnable {
public static final int BUFFER_SIZE = 1024 * 1024;
private static final String TAG = SubDownloadTask.class.getSimpleName();
@Id
private Long id;
private String url; // 文件下載的 url
private String taskTag; // 父任務的 Tag
private long taskSize; // 子任務大小
private long completedSize; // 子任務完成大小
private long startPos; // 開始位置
private long currentPos; // 當前位置
private long endPos; // 結束位置
private volatile int status; // 當前下載狀態
@Transient
private SubDownloadListener listener; // 子任務下載監聽,主要用于提示父任務
@Transient
private File saveFile; // 要保存到的文件
...
}
由于這里的數據庫的操作是用 GreenDao
實現,因此這里有一些相關注解,各位可以忽略。
InputStream
獲取可以看到,子任務是一個 Runnable,我們可以通過其 run 方法開始下載,這樣就可以通過如 ExecutorService 來開啟多個線程執行子任務。
我們看到其 run 方法:
@Override
public void run() {
status = DownloadStatus.DOWNLOADING;
DownloadManager.getInstance()
.getHttpHelper()
.getStreamByRange(url, currentPos, endPos, new NetCallback<InputStream>() {
@Override
public void onResult(InputStream inputStream) {
listener.onSubStart();
writeFile(inputStream);
}
@Override
public void onError(String message) {
listener.onSubError("文件流獲取失敗");
status = DownloadStatus.ERROR;
}
});
}
可以看到,我們獲取了其從 currentPos
到 endPos
端的字節流,通過其 Response Body 拿到了它的 InputStream
,然后調用了 writeFile(InputStream)
方法進行文件的寫入。
文件寫入
接下來看到 writeFile
方法:
private void writeFile(InputStream in) {
try {
RandomAccessFile file = new RandomAccessFile(saveFile, "rwd"); // 通過 saveFile 建立RandomAccessFile
file.seek(currentPos); // 跳轉到對應位置
byte[] buffer = new byte[BUFFER_SIZE];
while (true) {
// 循環讀取 InputStream,直到暫停或讀取結束
if (status != DownloadStatus.DOWNLOADING) {
// 狀態不為 DOWNLOADING,停止下載
break;
}
int offset = in.read(buffer, 0, BUFFER_SIZE);
if (offset == -1) {
// 讀取不到數據,說明讀取結束
break;
}
// 將讀取到的數據寫入文件
file.write(buffer, 0, offset);
// 下載數據并在數據庫中更新
currentPos += offset;
completedSize += offset;
DownloadManager.getInstance()
.getDbHelper()
.update(this);
// 通知父任務下載進度
listener.onSubDownloading(offset);
}
if(status == DownloadStatus.DOWNLOADING) {
// 下載完成
status = DownloadStatus.COMPLETED;
// 通知父任務下載完成
listener.onSubComplete(completedSize);
}
file.close();
in.close();
} catch (IOException e) {
e.printStackTrace();
listener.onSubError("文件下載失敗");
status = DownloadStatus.ERROR;
resetTask();
}
}
具體流程可以看代碼中的注釋。可以看到,子任務實際上就是循環讀取 InputStream
,并寫入文件,同時將下載進度同步到數據庫。
父任務也就是我們具體的下載任務,我們同樣先看到成員變量:
public class DownloadTask implements SubDownloadListener {
private static final String TAG = DownloadTask.class.getSimpleName();
private String tag; // 下載任務的 Tag,用于區分不同下載任務
private String url; // 下載 url
private String savePath; // 保存路徑
private String fileName; // 保存文件名
private DownloadListener listener; // 下載監聽
private long completeSize; // 下載完成大小
private long totalSize; // 下載任務總大小
private int status; // 當前下載進度
private int threadNum; // 線程數(由外部設置的每個任務的下載線程數)
private File file; // 保存文件
private List<SubDownloadTask> subTasks; // 子任務列表
private ExecutorService mExecutorService; // 線程池,用于執行子任務
...
}
對于一個下載任務,可以通過 download 方法開始執行:
public void download() {
listener.onStart();
subTasks = querySubTasks();
status = DownloadStatus.DOWNLOADING;
if (subTasks.isEmpty()) {
// 是新任務
downloadNewTask();
} else if (subTasks.size() == threadNum) {
// 不是新任務
downloadExistTask();
} else {
// 不是新任務,但下載線程數有誤
listener.onError("斷點數據有誤");
resetTask();
}
}
可以看到,我們先將子任務列表從數據庫中讀取出來。
downloadNewTask
方法。downloadExistTask
方法。如果子任務列表大小不等于線程數,說明當前的下載記錄已不可用,于是重置下載任務,從新下載。
我們先看到 downloadNewTask
方法:
DownloadManager.getInstance()
.getHttpHelper()
.getTotalSize(url, new NetCallback<Long>() {
@Override
public void onResult(Long total) {
completeSize = 0L;
totalSize = total;
initSubTasks();
startAsyncDownload();
}
@Override
public void onError(String message) {
error("獲取文件長度失敗");
}
});
可以看到,獲取到總長度后,通過調用 initSubTasks
方法,對子任務列表進行了初始化(計算子任務長度等),然后調用了 startAsyncDownload
方法后通過 ExecutorService
運行子任務進入子任務進行下載。
我們看到 initSubTasks
方法:
private void initSubTasks() {
long averageSize = totalSize / threadNum;
for (int taskIndex = 0; taskIndex < threadNum; taskIndex++) {
long taskSize = averageSize;
if (taskIndex == threadNum - 1) {
// 最后一個任務,則 size 還需要加入剩余量
taskSize += totalSize % threadNum;
}
long start = 0L;
int index = taskIndex;
while (index > 0) {
start += subTasks.get(index - 1).getTaskSize();
index--;
}
long end = start + taskSize - 1; // 注意這里
SubDownloadTask subTask = new SubDownloadTask();
subTask.setUrl(url);
subTask.setStatus(DownloadStatus.IDLE);
subTask.setTaskTag(tag);
subTask.setCompletedSize(0);
subTask.setTaskSize(taskSize);
subTask.setStartPos(start);
subTask.setCurrentPos(start);
subTask.setEndPos(end);
subTask.setSaveFile(file);
subTask.setListener(this);
DownloadManager.getInstance()
.getDbHelper()
.insert(subTask);
subTasks.add(subTask);
}
}
可以看到就是計算每個任務的大小及開始及結束點的位置,這里要注意的是 endPos 需要 -1,否則各個任務的下載位置會重疊,并且最后一個任務會多下載一個字節導致如文件損壞等影響。具體原因就是比如一個大小為 500 的文件,則應當是 0-499 而不是 0-500。
接下來我們看看 downloadExistTask
方法:
private void downloadExistTask() {
// 不是新任務,且下載線程數無誤,計算已下載大小
completeSize = countCompleteSize();
totalSize = countTotalSize();
startAsyncDownload();
}
這里其實很簡單,遍歷子任務列表計算已下載量及總任務量,并調用 startAsyncDownload 開始多線程下載。
具體執行子任務我們可以看到 startAsyncDownload
方法:
private void startAsyncDownload() {
for (SubDownloadTask subTask : subTasks) {
if (subTask.getCompletedSize() < subTask.getTaskSize()) {
// 只下載沒有下載結束的子任務
mExecutorService.execute(subTask);
}
}
}
可以看到,這里其實只是通過 ExecutorService 執行對應子任務(Runnable)而已。
####暫停功能
我們接下來看到 pause 方法:
public void pause() {
stopAsyncDownload();
status = DownloadStatus.PAUSE;
listener.onPause();
}
可以看到,這里只是調用了 stopAsyncDownload
方法停止子任務。
看到 stopAsyncDownload
方法:
private void stopAsyncDownload() {
for (SubDownloadTask subTask : subTasks) {
if (subTask.getStatus() != DownloadStatus.COMPLETED) {
// 下載完成的不再取消
subTask.cancel();
}
}
}
可以看到,調用了子任務的 cancel
方法。
繼續看到子任務的 cancel
方法:
void cancel() {
status = DownloadStatus.PAUSE;
listener.onSubCancel();
}
這里很簡單,僅僅是將下載狀態設置為了 PAUSE,這樣在寫入文件的下一次 while 循環時便會中止循環從而結束 Runnable
的執行。
看到 cancel
方法:
public void cancel() {
stopAsyncDownload();
resetTask();
listener.onCancel();
}
可以看到和暫停的邏輯差不多,只是在暫停后還需要對子任務重置從而使得下次下載從頭開始。
前面提到,外部可以通過 DownloadListener
監聽下載的進度,下面是 DownloadListener
接口的定義:
public interface DownloadListener {
default void onStart() {}
default void onDownloading(long progress, long total) {}
default void onPause() {}
default void onCancel() {}
default void onComplete() {}
default void onError(String message) {}
}
我們實時的下載進度其實是在子任務的保存文件過程中才能體現出來的,同樣,子任務的下載失敗也需要通知到 DownloadListener
,這是怎么做到的呢?
前面提到了,我們還定義了一個 SubDownloadListener
,其監聽者就是子任務的父任務。通過監聽我們可以將子任務狀態反饋到父任務,父任務再根據具體情況反饋數據給 DownloadListener
。
public interface SubDownloadListener {
void onSubStart();
void onSubDownloading(int offset);
void onSubCancel();
void onSubComplete(long completeSize);
void onSubError(String message);
}
比如之前看到,每次下載失敗我們都會調用 onSubError
,每次讀取 offset 的數據都會調用 onSubDownload(offset)
,每個任務下載失敗都會調用 onSubComplete(completeSize)
。這樣,我們子任務的下載狀態就成功返回給了上層。
我們接著看看上層是如何處理的:
@Override
public void onSubStart() {}
@Override
public void onSubDownloading(int offset) {
synchronized (this) {
completeSize = completeSize + offset;
listener.onDownloading(completeSize, totalSize);
}
}
@Override
public void onSubCancel() {}
@Override
public void onSubComplete(long completeSize) {
checkComplete();
}
@Override
public void onSubError(String message) {
error(message);
}
可以看到,每次下載到一段數據,它都會把數據量返回上來,此時 completeSize
就加上了對應的 offset,然后再將新的 completeSize
通知給監聽者,這樣就實現了下載進度的監聽。這里之所以加鎖是因為會有多個線程(子任務線程)對 completeSize
進行操作,加鎖保證線程安全。
而每次有子任務完成,它都會調用 checkComplete
方法檢查是否下載完成,若每個子任務都下載完成,則說明任務下載完成,然后通知監聽者。
同樣的,每次子任務出現錯誤,都會通知監聽者出現錯誤,并做一些錯誤情況下的處理。
到這里,這篇文章就結束了,我們成功實現了多線程斷點續傳下載功能。基于這個原理,我們可以做一些上層的封裝實現一個文件下載框架。
請查看完整的PDF版
(更多完整項目下載。未完待續。源碼。圖文知識后續上傳github。)
可以點擊關于我聯系我獲取完整PDF
(VX:mm14525201314)
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。