大文件分片上传实战与优化(前端Part)
1. 需求分析
1.1 分析
实现大文件上传, 其中包括以下功能
秒传: 文件已存在, 直接给前端返回文件 url
- 记录文件的 hash 与元数据到数据库中
- 上传文件前先计算 hash 和获取文件元数据请求接口进行比对
- 若比对成功则说明文件已存在, 直接返回前端文件 url
断点续传: 上传过程意外中断, 下次上传时不需要从头上传整个文件
- 前端将文件分片上传, 后端接收分片然后进行合并
- 上传分片前先请求接口查询需要上传的分片即实现断点续传
1.2 优化与亮点
- 实现了基于文件真实上传进度的进度条
- 实现了可控制 Promise 并发数量的 PromisePool
- 实现了基于 WebWorker 的 WorkerPool / ThreadPool
- 解决了前端计算大文件 hash 速度过慢的痛点, 最终实现 5GB 文件计算 Hash 仅需 3.5 秒
- 解决了 Node 中合并文件分片导致文件损坏的问题 (思路: 在 Minio 中做合并而不是在 Node 里合并) (详见后续文章)
1.3 技术选型
具体使用什么技术栈并不重要, 如果你使用的是 Vue/React 同样适用
前端: Angular + NG-Zorro
后端: Nest.js + Prisma + MySQL + Minio(文件存储)
(OOP 的大胜利)
2. 实现文件切片
2.1 目标
就是将文件按指定的切片大小进行切片, 最终拿到文件的 ArrayBuffer 数组用于上传和分片 Hash 计算
2.2 实现
传入文件然后返回文件切片后的 Blob 数组, 不过 Blob 数组不能直接用于计算分片 hash, 还需要将它们转成 ArrayBuffer 数组
1
2 * 分割文件
3 * @param file
4 * @param baseSize 默认分块大小为 1MB
5 * @private
6 */
7function sliceFile(file: File, baseSize = 1): Blob[] {
8 const chunkSize = baseSize * 1024 * 1024
9 const chunks: Blob[] = []
10 let startPos = 0
11 while (startPos < file.size) {
12 chunks.push(file.slice(startPos, startPos + chunkSize))
13 startPos += chunkSize
14 }
15 return chunks
16}
可以使用 FileReader 转换, 可以直接转换, 二者没有性能差距
区别只是 FileReader API 的兼容性可能会更好一点
以下两种实现二选一即可
1
2 * 将 File 转成 ArrayBuffer
3 * 注意: Blob 无法直接移交到 Worker 中, 所以需要放到主线程中执行
4 * @param chunks
5 * @private
6 */
7async function getArrayBufFromBlobs(chunks: Blob[]): Promise<ArrayBuffer[]> {
8 async function readAsArrayBuffer(file: Blob) {
9 return new Promise<ArrayBuffer>((rs) => {
10 const fileReader = new FileReader()
11 fileReader.onload = (e) => rs(e.target!.result as ArrayBuffer)
12 fileReader.readAsArrayBuffer(file)
13 })
14 }
15 return await Promise.all(chunks.map((chunk: Blob) => readAsArrayBuffer(chunk)))
16}
17
18
19 * 功能同上但语法更简洁
20 * @param chunks
21 */
22async function getArrayBufFromBlobsV2(chunks: Blob[]): Promise<ArrayBuffer[]> {
23 return Promise.all(chunks.map(chunk => chunk.arrayBuffer()))
24}
至此已实现了前端文件切片
2.3 踩坑
有些文章将可以把切片过程放到 WebWorker 中, 以避免阻塞主线程
实际情况是:
切片过程不会消耗太久时间, 其中主要是 IO 瓶颈
而 Blob[] 转 ArrayBuffer[] 的过程是基于 Promise 的, 这并不会阻塞主线程
注意 ! ! !
-
如果你将这个过程放到 Worker 中, 由于 File 或 Blob 并不是 Worker 中的可 Transfer 对象
-
此处会导致 主线程与 Worker 通信时进行结构化克隆, 由此会产生额外的CPU性能消耗和内存消耗
-
而且如果文件很大时(大概超过2GB)会导致 Worker 线程 OOM (内存溢出错误)
3. 前端计算分片 Hash
3.1 目标
使用文件分片的 Hash 来标识文件分片, 用来判断这个分片是否已经上传过了
3.2 存在的问题与解决思路
计算文件分片 Hash 是一个 CPU 密集型任务, 直接在主线程中计算 hash 必定会导致 UI 卡死, 考虑放到 WebWorker 中计算 Hash
ArrayBuffer 是可 Transfer 的对象, 在主线程与 Worker 线程通信时, 可以通过移交控制权的方式通信, 避免线程通信引起的结构化克隆
分片之间的 Hash 计算没有关联, 而 WebWorker 可以用来开额外的计算线程, 考虑基于 WebWorker 实现线程池(WorkerPool)来加速计算分片 Hash
当文件较大时计算使用分片的 MD5值作为 Hash 计算速度仍然较慢, 但分片的 hash 其实只是为了标识分片, 对于唯一性要求并不高, 考虑在文件较大的场景下使用 CRC32 值作为分片的 Hash
CRC32的十六进制表示只有8位(MD5有32位), 且 CPU 对计算 CRC32 有硬件加速, 速度会比计算 MD5 快得多
3.3 Web Worker
用于计算 MD5 的 Worker
这里使用了 SparkMD5 计算文件的 MD5
1
2
3
4import { WorkerMessage } from './util/worker-message'
5import { WorkerLabelsEnum } from './types/worker-labels.enum'
6import SparkMD5 from 'spark-md5'
7
8addEventListener('message', ({ data }: { data: ArrayBuffer }) => {
9 const hash = SparkMD5.ArrayBuffer.hash(data)
10
11 postMessage(
12 new WorkerMessage(WorkerLabelsEnum.DONE, {
13 result: hash,
14 chunk: data,
15 }),
16 [data],
17 )
18})
用于计算 CRC32 的 Worker
1
2
3
4import { getCrc, getCrcHex } from '../utils/upload-helper'
5import { WorkerMessage } from './util/worker-message'
6import { WorkerLabelsEnum } from './types/worker-labels.enum'
7
8addEventListener('message', ({ data }: { data: ArrayBuffer }) => {
9 const crc = getCrc(data)
10 const hash = getCrcHex(crc)
11
12 postMessage(
13 new WorkerMessage(WorkerLabelsEnum.DONE, {
14 result: hash,
15 chunk: data,
16 }),
17 [data],
18 )
19})
WorkerMessage: 用于 Worker 线程向主线程通信
1
2import { WorkerLabelsEnum } from '../types/worker-labels.enum'
3
4export class WorkerMessage<T = any> {
5 label: WorkerLabelsEnum
6 content?: T
7
8 constructor(label: WorkerLabelsEnum, content?: T) {
9 this.label = label
10 this.content = content
11 }
12}
WorkerLabelsEnum: 用于标识 Worker Message 的类型
1
2export enum WorkerLabelsEnum {
3 INIT,
4 CHUNK,
5 DONE,
6}
WorkerRep: WorkerMessage 的进一步封装, 方便传泛型
1
2export interface WorkerRep<T = any> {
3 data: WorkerMessage<T>
4}
3.4 Worker Pool 的实现
使用 Worker Pool 来复用 Worker 而不是每次计算 hash 都开新的 Worker
WorkerWrapper: 基于 Promise 追踪当前 Worker 的运行状态
1import { WorkerRep } from './worker-message'
2import { WorkerLabelsEnum } from '../types/worker-labels.enum'
3
4export enum StatusEnum {
5 RUNNING = 'running',
6 WAITING = 'waiting',
7}
8
9export class WorkerWrapper {
10 worker: Worker
11 status: StatusEnum
12
13 constructor(
14 worker: Worker,
15 ) {
16 this.worker = worker
17 this.status = StatusEnum.WAITING
18 }
19
20 run<T>(param: ArrayBuffer, params: ArrayBuffer[], index: number) {
21 this.status = StatusEnum.RUNNING
22 return new Promise<T>((rs, rj) => {
23 this.worker.onmessage = ({ data }: WorkerRep<{ result: string; chunk: ArrayBuffer }>) => {
24 const { label, content } = data
25 if (label === WorkerLabelsEnum.DONE && content) {
26 params[index] = content.chunk
27 this.status = StatusEnum.WAITING
28 rs(content.result as T)
29 }
30 }
31 this.worker.onerror = (e) => {
32 this.status = StatusEnum.WAITING
33 rj(e)
34 }
35 this.worker.postMessage(param, [param])
36 })
37 }
38}
39
WorkerPool: 用于管理 WorkerWrapper, 实现 Worker 复用
核心思路是使用发布订阅模式来订阅当前 正在跑的 Worker 的数量(curRunningCount)
此处使用了 Rxjs 中的 BehaviorSubject, 也可以自己写一个 发布订阅模式来实现
只需要实现两个方法 subscribe() 和 next(), 其中 subscribe 用来订阅, next 用于发布新值
1import { StatusEnum, WorkerWrapper } from './worker-wrapper'
2import { BehaviorSubject } from 'rxjs'
3
4export abstract class WorkerPool {
5 pool: WorkerWrapper[] = []
6 maxWorkerCount: number
7 curRunningCount = new BehaviorSubject(0)
8 results: any[] = []
9
10 protected constructor(
11 maxWorkers = navigator.hardwareConcurrency || 4,
12 ) {
13 this.maxWorkerCount = maxWorkers
14 }
15
16 exec<T>(params: ArrayBuffer[]) {
17 this.results.length = 0
18 const workerParams = params.map(
19 (param, index) => ({ data: param, index }),
20 )
21
22 return new Promise<T[]>((rs) => {
23 this.curRunningCount.subscribe(count => {
24 if (count < this.maxWorkerCount && workerParams.length !== 0) {
25
26 let curTaskCount = this.maxWorkerCount - count
27 if (curTaskCount > params.length) {
28 curTaskCount = params.length
29 }
30
31
32 const canUseWorker: WorkerWrapper[] = []
33 for (const worker of this.pool) {
34 if (worker.status === StatusEnum.WAITING) {
35 canUseWorker.push(worker)
36 if (canUseWorker.length === curTaskCount) {
37 break
38 }
39 }
40 }
41
42 const paramsToRun = workerParams.splice(0, curTaskCount)
43
44
45 this.curRunningCount.next(this.curRunningCount.value + curTaskCount)
46 canUseWorker.forEach((workerApp, index) => {
47 const param = paramsToRun[index]
48 workerApp.run(param.data, params, param.index)
49 .then((res) => {
50 this.results[param.index] = res
51 })
52 .catch((e) => {
53 this.results[param.index] = e
54 })
55 .finally(() => {
56 this.curRunningCount.next(this.curRunningCount.value - 1)
57 })
58 })
59 }
60
61 if (this.curRunningCount.value === 0 && workerParams.length === 0) {
62 rs(this.results as T[])
63 }
64 })
65 })
66 }
67}
68
WorkerPoolForMd5s: 用于实现使用 Worker Pool 计算所有分片的 MD5 值
1import { WorkerWrapper } from './util/worker-wrapper'
2import { WorkerPool } from './util/worker-pool'
3
4export class WorkerPoolForMd5s extends WorkerPool {
5 constructor(maxWorkers: number) {
6 super(maxWorkers)
7 this.pool = Array.from({ length: this.maxWorkerCount }).map(
8 () =>
9 new WorkerWrapper(
10 new Worker(new URL('./md5-single.worker', import.meta.url)),
11 ),
12 )
13 }
14}
15
WorkerPoolForCrc32s: 用于实现使用 Worker Pool 计算所有分片的 CRC32 值
1import { WorkerPool } from './util/worker-pool'
2import { WorkerWrapper } from './util/worker-wrapper'
3
4export class WorkerPoolForCrc32s extends WorkerPool {
5 constructor(
6 maxWorkers = navigator.hardwareConcurrency || 4,
7 ) {
8 super(maxWorkers)
9 this.pool = Array.from({ length: this.maxWorkerCount }).map(
10 () =>
11 new WorkerWrapper(
12 new Worker(new URL('./crc32-single.worker', import.meta.url)),
13 ),
14 )
15 }
16}
17
3.5 使用 Worker Pool 计算分片的 hash 值
1export class WorkerService {
2 readonly MAX_WORKERS = 8
3 md5SingleWorkerPool: WorkerPoolForMd5s | undefined
4 crc32SingleWorkerPool: WorkerPoolForCrc32s | undefined
5
6
7 getMD5ForFiles(chunks: ArrayBuffer[]): stirng[] {
8 if (this.md5SingleWorkerPool === undefined) {
9 this.md5SingleWorkerPool = new WorkerPoolForMd5s(this.MAX_WORKERS)
10 }
11 return this.md5SingleWorkerPool.exec<string>(chunks)
12 }
13
14
15 getCRC32ForFiles(chunks: ArrayBuffer[]): stirng[] {
16 if (this.crc32SingleWorkerPool === undefined) {
17 this.crc32SingleWorkerPool = new WorkerPoolForCrc32s(this.MAX_WORKERS)
18 }
19 return this.crc32SingleWorkerPool.exec<string>(chunks)
20 }
21}
3.6 踩坑
-
这里如果将
new Worker(new URL('./crc32-single.worker', import.meta.url)
中的 url 拆出来作为一个变量传进去会导致在运行时浏览器无法正确拿到 Worker 文件, 具体原因未知, 可能是打包工具引起的问题 -
确保 WorkerPoolForMd5s 和 WorkerPoolForCrc32s 是单例的, 否则会导致浏览器创建过多的 Web Worker
-
Worker 是需要手动关闭的, 可以找到合适的时机去关掉所有的 Worker
3.7 性能实测
硬件情况: Ryzen9 5900HX + 32Gb DDR4
3种方式各算了两次
1计算 1.8GB 文件分片的 MD5
2[主线程中直接算(单线程)]
314585.115966796875 ms
414066.404052734375 ms
5
68线程 WebWorker => 比单线程快了 670%
72174.992919921875 ms
82169.323974609375 ms
9
1012线程 WebWorker => 比 8 单线程快了 19%, 比单线程快了 776%
111825.158935546875 ms
121878.386962890625 ms
总结: 使用多线程的方式可以使 hash 计算性能提高 6 ~ 7 倍
4. 前端计算文件 Hash
4.1 目标
计算文件的 Hash 用来标识这个文件是否已经上传过了
4.2 存在的问题与解决思路
计算全部文件的 hash 无法采用并行计算的方式, 实测假定用户上传 1.8GB 文件, 仅算文件 MD5 就要消耗 15秒 时间(不包括计算文件分片 hash 的时间)
考虑
- 使用 wasm
- 使用 MerkleTree(默克尔树) 的树根 hash 作为 文件的 hash (本文采用)
4.3 MerkleTree
每个叶子节点是对应数据分片的 Hash, 非叶子结点为它 2 个子节点的哈希, 从叶子结点层层向上计算 hash, 即得到 默克尔树根, 它可以用来校验数据集的完整性
如图所示, 其中的
hA hB hC hD hE hF hG hH 即为各个文件分片的 Hash
hAB 只是基于文件分片的 Hash 计算 Hash
所以得到默克尔树根的 hash 速度会非常快, 因为并没有直接计算全部文件的 hash, 只是根据全部分片的 hash 进行计算
最后使用 树根的 hash 作为 文件 hash, 这样即实现了标识文件的唯一, 计算速度又非常快
4.4 MerkleTree 的实现
此处仍然使用 SparkMD5 来计算 MD5 作为 hash
这里实现了 MerkleTree 的序列化和反序列化方法, 如果想保存整个 MerkleTree 树可以将这个序列化后的结果存入数据库, 我在这里只是用到了计算 MerkleTree 的树根
1import SparkMD5 from 'spark-md5'
2
3
4interface IMerkleNode {
5 h: string
6 l: IMerkleNode | null
7 r: IMerkleNode | null
8}
9
10
11interface IMerkleTree {
12 root: IMerkleNode
13 leafs: IMerkleNode[]
14
15}
16
17
18class MerkleNode implements IMerkleNode {
19 h: string
20 l: IMerkleNode | null
21 r: IMerkleNode | null
22
23 constructor(hash: string, left: IMerkleNode | null = null, right: IMerkleNode | null = null) {
24 this.h = hash
25 this.l = left
26 this.r = right
27 }
28}
29
30
31export class MerkleTree implements IMerkleTree {
32 root: IMerkleNode
33 leafs: IMerkleNode[]
34
35 constructor(hashList: string[])
36 constructor(leafNodes: IMerkleNode[])
37 constructor(nodes: string[] | IMerkleNode[]) {
38 if (nodes.length === 0) {
39 throw new Error('Empty Nodes')
40 }
41 if (typeof nodes[0] === 'string') {
42 this.leafs = nodes.map((node) => new MerkleNode(node as string))
43 } else {
44 this.leafs = nodes as IMerkleNode[]
45 }
46 this.root = this.buildTree()
47 }
48
49 getRootHash() {
50 return this.root.h
51 }
52
53 buildTree(): IMerkleNode {
54
55 let currentLevelNodes = this.leafs
56 while (currentLevelNodes.length > 1) {
57 const parentNodes: IMerkleNode[] = []
58 for (let i = 0; i < currentLevelNodes.length; i += 2) {
59 const left = currentLevelNodes[i]
60 const right = i + 1 < currentLevelNodes.length ? currentLevelNodes[i + 1] : null
61
62 const parentHash = this.calculateHash(left, right)
63 parentNodes.push(new MerkleNode(parentHash, left, right))
64 }
65 currentLevelNodes = parentNodes
66 }
67
68 return currentLevelNodes[0]
69 }
70
71
72 serialize(): string {
73 const serializeNode = (node: IMerkleNode | null): any => {
74 if (node === null) {
75 return null
76 }
77 return {
78 h: node.h,
79 l: serializeNode(node.l),
80 r: serializeNode(node.r),
81 }
82 }
83
84 const serializedRoot = serializeNode(this.root)
85 return JSON.stringify(serializedRoot)
86 }
87
88
89 static deserialize(serializedTree: string): MerkleTree {
90 const parsedData = JSON.parse(serializedTree)
91
92 const deserializeNode = (data: any): IMerkleNode | null => {
93 if (data === null) {
94 return null
95 }
96 return new MerkleNode(data.h, deserializeNode(data.l), deserializeNode(data.r))
97 }
98
99 const root = deserializeNode(parsedData)
100 if (!root) {
101 throw new Error('Invalid serialized tree data')
102 }
103
104
105
106
107 const extractLeafNodes = (node: IMerkleNode): IMerkleNode[] => {
108 if (node.l === null && node.r === null) {
109 return [node]
110 }
111 return [
112 ...(node.l ? extractLeafNodes(node.l) : []),
113 ...(node.r ? extractLeafNodes(node.r) : []),
114 ]
115 }
116 const leafNodes = extractLeafNodes(root)
117
118 return new MerkleTree(leafNodes)
119 }
120
121 private calculateHash(left: IMerkleNode, right: IMerkleNode | null): string {
122 return right ? SparkMD5.hash(left.h + right.h) : left.h
123 }
124}
125
4.5 使用 MerkleTree 树根的 Hash 作为文件 Hash
1
2const merkleTree = new MerkleTree(chunksHash)
3const fileHash = merkleTree.getRootHash()
5. 文件分片的并发上传
5.1 目标
多个文件分片可以同时上传到后端, 但不能使用 Promise.all() 直接将所有分片一起传到后端,
当文件分片数量较多时, 会导致同时开启的 HTTP 链接过多
使用一个 PromisePool 来控制同时处于 pending 状态的 Promise 的数量
注意:
使用 Promise.all() 只是用来收集 Promise 数组的执行结果, 它并不能用来控制同时处于 Pending 状态 Promise 的数量, 而 Promise 一旦创建了就会立即执行其中 new Promise() 中的同步代码(即发送网络请求),
所以需要创建 Promise 这个过程用函数包起来, 以实现当需要的时候再去执行
即函数调用的时候才会创建这个 Promise
5.2 实现
实现思路同 Worker Pool
接收一个 () => Promise<any>
数组作为任务
1import { BehaviorSubject } from 'rxjs'
2
3type AsyncFunction = () => Promise<any>
4
5export class PromisePool {
6 private readonly queue: { fn: AsyncFunction, index: number }[] = []
7 private readonly maxConcurrentTasks: number
8 private results: any[] = []
9
10 curRunningCount = new BehaviorSubject(0)
11
12 constructor(
13 functions: AsyncFunction[],
14 maxConcurrentTasks: number = navigator.hardwareConcurrency || 8,
15 ) {
16 this.queue = functions.map((fn, index) => ({ fn, index }))
17 this.maxConcurrentTasks = maxConcurrentTasks
18 }
19
20 exec<T>() {
21 return new Promise<T[]>((rs) => {
22 this.curRunningCount.subscribe((count) => {
23 if (count < this.maxConcurrentTasks && this.queue.length !== 0) {
24
25 let curTaskCount = this.maxConcurrentTasks - count
26 if (curTaskCount > this.queue.length) {
27 curTaskCount = this.queue.length
28 }
29
30 const tasks = this.queue.splice(0, curTaskCount)
31 this.curRunningCount.next(this.curRunningCount.value + curTaskCount)
32
33 tasks.forEach((taskWrap) => {
34 const { fn, index } = taskWrap
35 fn().then((result) => {
36 this.results[index] = result
37 }).catch((error) => {
38 this.results[index] = error
39 }).finally(() =>
40 this.curRunningCount.next(this.curRunningCount.value - 1)
41 )
42 })
43 }
44
45 if (this.curRunningCount.value === 0 && this.queue.length === 0) {
46 rs(this.results as T[])
47 }
48 })
49 })
50 }
51}
52
5.3 使用示例
可以看见同时处于 跑起来了状态的 Promise 只有 4 个
1async testPromisePool() {
2 const arr = [ 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 ]
3 const asyncFns = arr.map(
4 (num) => async () => {
5 await new Promise<number>((rs) => {
6 console.log('跑起来了: ' + num)
7 setTimeout(() => {
8 rs(num * 2)
9 }, 100)
10 })
11
12 return new Promise((rs) => {
13 setTimeout(() => {
14 rs('结果: ' + num * 10)
15 }, 2000)
16 })
17 },
18 )
19
20 const pool = new PromisePool(asyncFns, 4)
21 pool.exec().then((res) => {
22 console.log(res)
23 })
24}
6. 基于实际上传进度的进度计算
6.1 思路 (Axios)
使用可以设置其中的上传参数: onUploadProgress, 用于处理上传进度事件, 详见 axios-http.com/zh/docs/req…
6.2 思路 (Angular HTTP Client)
通过 Rxjs 的 pipe() 来处理上传事件, 详见 angular.cn/guide/http#…
6.3 基于 HTTP Client 的实现
封装 HttpClient, 其中 cb(event.loaded) 中的 event.loaded 即为当前请求已经上传了多少数据
1
2import { HttpClient, HttpEvent, HttpEventType, HttpRequest } from '@angular/common/http'
3import { BehaviorSubject, last, lastValueFrom, map, Observable } from 'rxjs'
4
5export class HttpService {
6 constructor(private http: HttpClient) {}
7
8 private getEventMessage(event: HttpEvent<any>, cb?: (current: any) => void) {
9 if (event.type === HttpEventType.UploadProgress) {
10 cb && cb(event.loaded)
11 }
12 }
13
14 postPWithProgress<T>(
15 url: string,
16 body: any,
17 extra: HttpExtraParam = {},
18 cb?: (current: number) => void,
19 ) {
20 const urlWithExtra = appendQueryParams(url, extra)
21 return lastValueFrom(
22 this.http
23 .request<T>(
24 new HttpRequest('POST', urlWithExtra, body, {
25 reportProgress: true,
26 }),
27 )
28 .pipe(
29 map((event) => this.getEventMessage(event, cb)),
30 last(),
31 ),
32 )
33 }
34}
完整应用详见 前端上传流程
7. 前端上传流程与策略
7.1 流程
- 获取文件元数据
- 文件分片
- 计算分片 Hash 与 文件 Hash
- 检查文件是否已经上传过
- 查询需要上传的文件分片
- 构建上传参数
- 上传实际需要上传的分片
- 待全部分片上传完成后校验分片
- 合并分片
7.2 策略
文件分片大小设定为 10MB 一个分片
Hash 策略
- 当文件分片数量 为1片时 (10MB 以下) 直接计算整个文件的 MD5
- 当文件分片数量小于 100 片时(1GB 以下) 基于分片的 MD5 计算 默克尔树根作为 文件 Hash
- 当文件分片数量大于 100 片时(1GB 以上) 基于分片的 CRC32 Hex 计算 默克尔树根作为 文件 Hash
7.3 实时上传进度的思路
其实类似于多线程状态下去刷一个数据(已上传数据的大小)
-
使用一个数组去存放所有请求的已上传文件大小, 初始值全是 0
-
按数组顺序刷写整个数组中各个请求的上传进度, 最后将数组中各元素求和, 即得到当前总的已上传大小
-
使用一个 定时器每隔 100ms 读一下当前上传的进度, 更新进度条即可
-
并不需要每个子上传进度上传时就更新进度条, 类似节流的思想
详见整体实现部分的代码
7.4 整体实现
1interface IMetaData {
2 size: number,
3 lastModified: number,
4 type: string
5}
6
7export class MinioUploaderService {
8
9 uploadStatus = new BehaviorSubject<string>('Please select a file.')
10
11 constructor(private uploadApiSvc: UploadApiService) {}
12
13 async doUpload(
14 file: File,
15 chunkSize: number,
16 cb: (progress: number) => void,
17 ) {
18
19 const BORDER_COUNT = 100
20
21
22 const fileSize = file.size / 1000
23
24
25 const metadata: IMetaData = {
26 size: file.size,
27 lastModified: file.lastModified,
28 type: file.type,
29 }
30
31
32 this.uploadStatus.next('Parsing file ...')
33 const chunksBlob = sliceFile(file, chunkSize)
34 const chunksBuf = await getArrayBufFromBlobsV2(chunksBlob)
35
36
37 let chunksHash: string[] = []
38 if (chunksBuf.length === 1) {
39 chunksHash = [getMD5FromArrayBuffer(chunksBuf[0])]
40 } else if (chunksBuf.length <= BORDER_COUNT) {
41 chunksHash = await this.workerSvc.getMD5ForFiles(chunksBuf)
42 } else {
43 chunksHash = await this.workerSvc.getCRC32ForFiles(chunksBuf)
44 }
45 const merkleTree = new MerkleTree(chunksHash)
46 const fileHash = merkleTree.getRootHash()
47
48
49 this.uploadStatus.next('Checking file if exist ...')
50 const { data: existUrl } = await this.uploadApiSvc.checkFileIfExist(fileHash, fileSize)
51 if (existUrl) {
52 this.uploadStatus.next('Completed.')
53 return existUrl
54 }
55
56
57 this.uploadStatus.next('Get the chunks that need to be uploaded ...')
58 const { data: _chunksNeedUpload } = await this.uploadApiSvc.getExistChunks(
59 fileHash,
60 chunksHash,
61 )
62
63
64 this.uploadStatus.next('Building upload params ...')
65 const paramsMap = new Map<string, FormData>()
66 chunksBlob.forEach((chunk, index) => {
67 const data = new FormData()
68 data.append('files', chunk)
69 data.set('name', file.name)
70 data.set('index', index.toString())
71 data.set('fileHash', fileHash)
72 data.set('chunkHash', chunksHash[index])
73 paramsMap.set(chunksHash[index], data)
74 })
75
76
77 const params = _chunksNeedUpload.map((chunkHash) => paramsMap.get(chunkHash)!)
78 this.uploadStatus.next('Uploading ...')
79
80
81 const total = file.size
82 const currentProgressList: number[] = []
83 const intervalId = setInterval(() => {
84 const current = currentProgressList.reduce((acc, cur) => acc + cur, 0)
85 cb(Math.ceil((current / total) * 100))
86 }, 150)
87
88 await new PromisePool(params.map((param, index) => () =>
89 this.uploadApiSvc.uploadChunks(param, (current) => {
90 currentProgressList[index] = current
91 })
92 )).exec()
93 clearInterval(intervalId)
94 cb(100)
95
96
97 this.uploadStatus.next('Verify uploaded chunks ...')
98 const { data: brokenChunksList } = await this.uploadApiSvc.verifyChunks2(fileHash, chunksHash)
99 if (brokenChunksList.length !== 0) {
100 console.log('brokenChunksList: ', brokenChunksList)
101 return ''
102 }
103
104
105 this.uploadStatus.next('Merging chunks ...')
106 const { data: url } = await this.uploadApiSvc.mergeChunks(fileHash, file.name, fileSize, metadata)
107 this.uploadStatus.next('Completed.')
108 return url
109 }
110}
8. 总结
目前存在的问题:
- 使用默克尔树计算树根 Hash 的方式依赖于分片大小, 如果分片大小改变 Hash 值会不同
- 在计算文件分片之前获取了全部文件分片的 ArrayBuffer, 这会导致将整个文件都读入到内存中, 后续会尝试优化一下
完整 Demo 仓库:
前端项目: github.com/Tkunl/kun-u…
后端项目: github.com/Tkunl/kun-u…
此外本文可能有少许不准确或者有误的地方,欢迎评论区赐教。最后,如果觉得还不错,对你有帮助的话,欢迎点赞、收藏、转发 ❤❤❤
后端部分详见: # 超详细的大文件分片上传⏫实战与优化(后端部分) juejin.cn/post/735436…
文章参考: 一文吃透👉大文件分片上传、断点续传、秒传⏫ juejin.cn/post/732414…
转载请注明原作者, 和原文链接, 谢谢 ! ! !
9. 重大性能优化更新
- 解决了在计算hash时浏览器内存占用过高的问题, 使用以下逻辑替换掉之前的 Hash 策略
现在每轮 hash 计算开始前会释放掉上一次计算 hash 时使用的 ArrayBuffer 从而大量减少内存占用
(之前会将文件的全部分片转为 ArrayBuffer 数组然后都存到内存中, 从而产生大量内存占用)
1
2this.uploadStatus.next('Parsing file ...')
3const chunksBlob = sliceFile(file, chunkSize)
4let chunksHash: string[] = []
5if (chunksBlob.length === 1) {
6 chunksHash = [getMD5FromArrayBuffer(await chunksBlob[0].arrayBuffer())]
7} else {
8 let chunksBuf: ArrayBuffer[] = []
9
10 const chunksPart = getArrParts<Blob>(chunksBlob, this.workerSvc.MAX_WORKERS)
11 const tasks = chunksPart.map(
12 (part) => async () => {
13
14
15 chunksBuf.length = 0
16 chunksBuf = await getArrayBufFromBlobsV2(part)
17
18 return chunksBlob.length <= BORDER_COUNT ?
19 await this.workerSvc.getMD5ForFiles(chunksBuf) :
20 await this.workerSvc.getCRC32ForFiles(chunksBuf)
21 },
22 )
23 for (const task of tasks) {
24 const result = await task()
25 chunksHash.push(...result)
26 }
27}
其中用到的 getArrParts 方法的实现
1
2 * 用于将数组分块 [1, 2, 3, 4] => [[1, 2], [3, 4]]
3 * @param chunks 原始数组
4 * @param size 分 part 大小
5 */
6function getArrParts<T>(chunks: any, size: number) {
7 const result: T[][] = []
8 let tempPart: T[] = []
9 chunks.forEach((chunk: T) => {
10 tempPart.push(chunk)
11 if (tempPart.length === size) {
12 result.push(tempPart)
13 tempPart = []
14 }
15 })
16 if (tempPart.length !== 0) result.push(tempPart)
17 return result
18}
- 现在计算 hash 的速度更快了
使用 hash-wasm 替换掉原来的 spark-md5
1
2
3
4import { md5 } from 'hash-wasm'
5import { WorkerMessage } from './util/worker-message'
6import { WorkerLabelsEnum } from './types/worker-labels.enum'
7
8addEventListener('message', async ({ data }: { data: ArrayBuffer }) => {
9 const hash = await md5(new Uint8Array(data))
10
11 postMessage(
12 new WorkerMessage(WorkerLabelsEnum.DONE, {
13 result: hash,
14 chunk: data,
15 }),
16 [data],
17 )
18})