前端大文件上传 - 总结(Vue3 + hook + Web Worker实现,通过多个Worker线程大大提高Hash计算的速度)

03-10 6172阅读 0评论
流程图前端大文件上传 - 总结(Vue3 + hook Web Worker实现,通过多个Worker线程大大提高Hash计算的速度) 第1张

根据流程图,下面详细说一下具体的实现

选择文件

基于 el-upload 实现;用户选择文件之后,监听 el-upload 的Change事件,将 File 对象保存;原生的实现也是同理只要保存 File 对象就行。

当用户点击提交之后才进行分片操作,不然等待分片完成才提交需要一定时间(770MB的文件耗时是16s)

文件分片&计算文件hash值

为了判断文件是否上传过,必须使用根据文件内容生成的hash值判断。而这个过程可以在分片的时候计算。通过 SparkMD5 进行增量更新,这里的增量更新是指:在计算第一个分块的哈希值后,对接下来的分块只计算新增内容的哈希值,而不是对整个分块重新计算。

其他方案:之前前后两个完整的分块,其他分块分别只取前中后几个字节计算。这种方案可以大大减少时间,但是无法确保hash值的唯一性。

这里我们使用第一种方案,同时文件切片的执行交由 Web Worker 处理。

代码实现:

// 大文件切片上传,worker.js
import SparkMD5 from 'spark-md5';
const DefaultChunkSize = 1024 * 1024 * 50; // 50MB
// const DefaultChunkSize = 1024 * 1024 * 1; // 1MB
self.onmessage = (e) => {
	const { file, chunkSize = DefaultChunkSize } = e.data;
	let blobSlice = File.prototype.slice || File.prototype.mozSlice || File.prototype.webkitSlice,
		chunks = Math.ceil(file.size / chunkSize),
		currentChunk = 0,
		spark = new SparkMD5.ArrayBuffer(),
		fileChunkHashList = [],
		fileChunkList = [],
		fileReader = new FileReader();
	loadNext();
	function loadNext() {
		let start = currentChunk * chunkSize,
			end = ((start + chunkSize) >= file.size) ? file.size : start + chunkSize;
		let chunk = blobSlice.call(file, start, end);
		fileChunkList.push(chunk);
		fileReader.readAsArrayBuffer(chunk);
	}
	function getChunkHash(e) {
		const chunkSpark = new SparkMD5.ArrayBuffer();
		chunkSpark.append(e.target.result);
		fileChunkHashList.push(chunkSpark.end());
	}
	// 处理每一块的分片
	fileReader.onload = function (e) {
		spark.append(e.target.result); // Append array buffer
		currentChunk++;
		getChunkHash(e)
		if (currentChunk  
 
 

当用户点击提交时执行 handleCutFile 函数,该函数会创建一个 worker ,并执行对应的文件切片脚本,当文件切片完成会返回对应的 fileMd5 、fileChunkList 和 fileChunkHashList。然后执行 handleCutSuccess 函数,就是对应流程图后面的操作

async function handleCutFile() {
	// Vite中使用worker.js脚本
	const worker = new Worker(new URL('@/workers/cutFile.js', import.meta.url), {
		type: 'module',
	})
	
	worker.postMessage({ file: file.value.raw })
	worker.onmessage = (e) => {
		handleCutSuccess(e.data)
		worker.terminate()
	}
}
文件上传

检查文件是否上传过

async function checkFile() {
	// 这个接口要配置防响应拦截
	const params = {
		filename: fileName.value,
		file_hash: fileMd5.value,
		total_chunks: chunkTotal.value,
	}
	const data = await checkFileFn(params)
	// 已经上传过返回已上传的分块IDs:chunk_upload[]
	if (data.code === 0) {
		return data.data
	}
	if (data.code === 1) {
		// 空间不足
		modal.msgError(t('sample.notEnoughSpace'))
		return false
	}
	modal.msgError(data.msg)
	return false
}

根据返回的分片ID(索引 + 1)列表处理对应逻辑

async function uploadFile() {
	const data = await checkFile()
	if (!data) return
	const { chunk_upload, upload_id } = data
	uploadId.value = upload_id
	if (chunk_upload.length === 0) {
		// 上传整个文件
	}
	
	if (chunk_upload.length !== chunkTotal.value) {
		// 上传未上传的分片 - 断点续传
	}
	// 上传完成 - 秒传(可能需要发起合并请求)
}

处理所有分片的请求 & 进度的获取

// 通过请求池的方式上传文件
async function handleUploadRequest(uploadedChunks = []) {
	const requestList = []
	for (let i = 0; i  {
			chunkUploadedSize[index] = progressEvent.loaded
			// 计算uploadedSize的总和
			const size = Object.values(chunkUploadedSize).reduce((total, item) => {
				return total + item
			}, 0)
			// 计算总的上传进度
			uploadProgress = ((size + uploadedSize) / fileSize.value) * 100
			store.dispatch('upload/setProgress', uploadProgress)
		}
	}
}

通过 async 控制并发请求的数量,npm i -D async;当全部发送完成时发起合并请求。

async function processRequests(requestList) {
	// 同时发起三个请求
	mapLimit(requestList, 3, async (req) => {
		await request(req)
	}, async (err) => {
		if (err) {
			console.log('err: ', err)
			return false
		}
		// 全部发送成功应该发起合并请求
		await mergeFileFn({ upload_id: uploadId.value })
		return true
	})
}
完整代码

这里通过 hook 封装了这一块代码。代码中并没有处理文件上传完成,但没有发起合并请求的情况,因为后端没返回这种情况,所以这里没写;建议与后端进行沟通要求考虑全部情况。

import request from '@/utils/request'
import modal from '@/plugins/modal'
import { mapLimit } from 'async'
import { useI18n } from "vue-i18n"
const DefaultChunkSize = 1024 * 1024 * 50; // 50MB
const useCutFile = ({ uploadUrl, checkFileFn, mergeFileFn, callback }) => {
	const store = useStore();
	const { t } = useI18n()
	const file = ref(null)
	const fileName = computed(() => file.value?.name || '')
	const fileSize = computed(() => file.value?.size || 0)
	const fileMd5 = ref('')
	const fileChunkList = ref([])
	const fileChunkHashList = ref([])
	const chunkTotal = ref(0)
	// 上传id
	const uploadId = ref('')
	// 上传进度
	let uploadProgress = 0
	// 每一个分片已上传的大小
	let chunkUploadedSize = {}
	// 断点续传时已上传的大小
	let uploadedSize = 0
	// 监听上传文件弹框的文件改变
	async function handleUploadChange(fileObj) {
		file.value = fileObj
	}
	// 开始处理文件分块
	async function handleCutFile() {
		const worker = new Worker(new URL('@/workers/cutFile.js', import.meta.url), {
			type: 'module',
		})
		// 文件切块的过程不可点击
		worker.postMessage({ file: file.value.raw })
		worker.onmessage = (e) => {
			handleCutSuccess(e.data)
			worker.terminate()
		}
	}
	// 切片文件成功
	async function handleCutSuccess(data) {
		fileMd5.value = data.fileMd5
		fileChunkList.value = data.fileChunkList
		fileChunkHashList.value = data.fileChunkHashList
		chunkTotal.value = fileChunkList.value.length
		uploadFile()
	}
	// 上传文件
	async function uploadFile() {
		const data = await checkFile()
		if (!data) return
		const { chunk_upload, upload_id } = data
		uploadId.value = upload_id
		if (chunk_upload.length === 0) {
			// 上传整个文件
			return await handleUploadRequest()
		}
		// 上传未上传的分片,过滤已上传的分片 - 断点续传
		if (chunk_upload.length !== chunkTotal.value) {
			uploadedSize = chunk_upload.length * DefaultChunkSize
			return await handleUploadRequest(chunk_upload)
		}
		// 上传完成 - 秒传
		store.dispatch('upload/setSampleUploading', false)
		modal.msgSuccess(t('upload.uploadedTip'))
		resetData()
		return true
	}
	// 检查文件是否已经上传过
	async function checkFile() {
		// 这个接口要配置防响应拦截
		const params = {
			filename: fileName.value,
			file_hash: fileMd5.value,
			total_chunks: chunkTotal.value,
		}
		const { code, msg, data } = await checkFileFn(params)
		// 已经上传过返回对应hash值
		if (code === 0) {
			return data
		}
		modal.msgError(msg)
		store.dispatch('upload/setSampleUploading', false)
		return false
	}
	// 处理分片文件的上传
	function uploadChunk(chunk, index, fileMd5) {
		const params = {
			chunk_id: index + 1,
			file_hash: fileMd5,
			upload_id: uploadId.value,
			chunk_hash: fileChunkHashList.value[index],
		}
		const formData = new FormData()
		formData.append('file_chunk', chunk)
		return {
			url: uploadUrl,
			method: 'post',
			timeout: 5 * 60 * 1000,
			data: formData,
			params,
			skipInterceptor: true,
			headers: {
				// 以前的人写了避免重复提交
				repeatSubmit: false,
			},
			onUploadProgress: (progressEvent) => {
				chunkUploadedSize[index] = progressEvent.loaded
				// 计算uploadedSize的总和
				const size = Object.values(chunkUploadedSize).reduce((total, item) => {
					return total + item
				}, 0)
				// 计算总的上传进度
				uploadProgress = ((size + uploadedSize) / fileSize.value) * 100
				store.dispatch('upload/setProgress', uploadProgress)
			}
		}
	}
	// 通过请求池的方式上传文件
	async function handleUploadRequest(uploadedChunks = []) {
		const requestList = []
		for (let i = 0; i  {
			await request(reqItem)
		}, async (err) => {
			if (err) {
				console.log('err: ', err)
				modal.msgError(t('upload.error'))
				store.dispatch('upload/setSampleUploading', false)
				resetData()
				return false
			}
			await mergeFileFn({ upload_id: uploadId.value })
			modal.msgSuccess(t('upload.success'))
			callback && callback()
			store.dispatch('upload/setSampleUploading', false)
			resetData()
			return true
		})
	}
	// 上传成功,还原数据
	function resetData() {
		fileMd5.value = ''
		fileChunkList.value = []
		fileChunkHashList.value = []
		chunkTotal.value = 0
		uploadId.value = ''
		uploadProgress = 0
		chunkUploadedSize = {}
		uploadedSize = 0
	}
	return {
		file,
		handleUploadChange,
		handleCutFile,
		handleCutSuccess,
		uploadFile,
		resetData,
	}
}
export default useCutFile
额外–上传组件封装
	
		
			
				
			
			
{{ t('upload.drag') }} {{ t('upload.upload') }}
{{ tipText || t('upload.onlyCsv') }} ({{ t('upload.downloadTemplate2') }} )
import modal from '@/plugins/modal' import { genFileId } from 'element-plus' import { download } from '@/utils/request' import { useI18n } from 'vue-i18n' const { t } = useI18n() const props = defineProps({ // 弹框参数 visible: { type: Boolean, default: false, }, title: { type: String, default: 'Upload File', }, width: { type: String, default: '450px', }, // 上传参数 hasAuthorization: { type: Boolean, default: true, }, // * 任意文件 accept: { type: String, default: '.csv', }, action: { type: String, default: '', }, showFileList: { type: Boolean, default: true, }, autoUpload: { type: Boolean, default: false, }, // 500MB size: { type: Number, default: 500, }, tipText: { type: String, default: '', }, templateUrl: { type: String, default: '', }, templateName: { type: String, default: 'template', }, customDownload: { type: Boolean, default: false, }, downloadMethod: { type: String, default: 'post', }, autoSubmit: { type: Boolean, default: true, }, }) const emit = defineEmits(['change', 'remove', 'success', 'error', 'submit', 'cut-success']) const store = useStore() const getHeaders = computed(() => { if (props.hasAuthorization) { return { Authorization: 'Bearer ' + store.getters.token } } return {} }) const actionUrl = computed(() => { return props.action ? `${ import.meta.env.VITE_APP_BASE_API }${props.action}` : '' }) const loading = ref(false) const uploadRef = ref() const isAbort = ref(false) const disabled = ref(true) const fileObj = ref(null) const handleBeforeUpload = (file) => { if(isAbort.value) { abort(file) return } loading.value = true } const handleChange = (file) => { const isLt = file.size / 1024 / 1024 item.substring(1)) : [] // 以第一个.后面的所有作为文件后缀 const tmp = file.name.split('.') tmp.shift() const fileExtension = tmp.join('.').toLowerCase() if (!isLt) { modal.msgError(`${t('upload.sizeLimit')}${props.size}MB!`) isAbort.value = true return false } if (allowedExtensions.length && !allowedExtensions.includes(fileExtension)) { modal.msgError(`${t('upload.fileType')} ${allowedExtensions.join(', ')}`) isAbort.value = true return false } disabled.value = false fileObj.value = file emit('change', file) return true } const handleRemove = () => { disabled.value = true emit('remove', null, null) } const handleSuccess = (res, file) => { if (res.code && res.code !== 0) { emit('change', null, null) uploadRef.value.clearFiles() modal.msgError(res.msg, 10000) loading.value = false return } modal.msgSuccess(t('upload.success')) // 不知道为什么,这里触发的就算多级嵌套也能在父级接收到 emit('success', res, file) emit('update:visible', false) loading.value = false } const handleError = (err) => { modal.msgError(t('upload.error')) emit('error', err) loading.value = false } const handleExceed = (files) => { uploadRef.value.clearFiles() const file = files[0] file.uid = genFileId() uploadRef.value.handleStart(file) } const handleDownload = () => { if (props.customDownload) { emit('download') return } if (props.templateName.includes('.')) { download(props.templateUrl, {}, props.templateName) return } download(props.templateUrl, {}, `${props.templateName}${props.accept}`, props.downloadMethod) } const handleReset = () => { uploadRef.value.clearFiles() } const handleConfirm = () => { if (props.autoSubmit) { uploadRef.value.submit() } else { emit('submit', uploadRef.value, fileObj.value) } } defineExpose({ handleReset, }) .content { margin-top: 18px; }
文件Hash计算优化

思路:就是通过多个Web Worker处理一个文件,将这个文件分成对应数量的Chunk给不同的Web Worker处理;这里只做粗略的计算,就是按照200M分一个Web Worker计算,最多5个Web Worker。因为调研发现Web Worker不是越多越好,至于多少个最合适我也不知道;因为目前公司业务需求上传的文件最多只接受5G,平均算下来1G一个Web Worker计算Hash的时间也只是20s;现在1G以内的文件都是5s算完。

问题: 实际上这样计算跟单个Web Worker计算的Hash值是不同的

代码实现,下面是改变的部分
	// Web Worker的数量
	let workerNum = 1
	// 存储多个Web Worker
	const workers = []
	// 开始处理文件分块
	async function handleCutFile() {
		// 缓存worker返回的数据
		const workerArr = new Array(workerNum).fill(null)
		let count = 0
		workerNum = Math.min(Math.ceil(file.value.size / DefaultWorkSize), MaxWorker)
		for (let i = 0; i  {
			fileHashes.value.push(item.fileMd5)
			fileChunkList.value.push(...item.fileChunkList)
			fileChunkHashList.value.push(...item.fileChunkHashList)
		})
		fileMd5.value = data.length === 1 ? data[0].fileMd5 : calculateFileHash()
		chunkTotal.value = fileChunkList.value.length
		uploadFile()
	}
展望

后面更改上传功能为选择多个文件,但是同时只能上传一个(带宽有限,参考了一下其他的线上上传也是同时只能有一个在上传);但是多个文件又涉及多个Web Worker计算问题所以需要解决的问题挺多,到时候再另外起一篇文章总结。


免责声明
1、本网站属于个人的非赢利性网站,转载的文章遵循原作者的版权声明。
2、本网站转载文章仅为传播更多信息之目的,凡在本网站出现的信息,均仅供参考。本网站将尽力确保所
提供信息的准确性及可靠性,但不保证信息的正确性和完整性,且不对因信息的不正确或遗漏导致的任何
损失或损害承担责任。
3、任何透过本网站网页而链接及得到的资讯、产品及服务,本网站概不负责,亦不负任何法律责任。
4、本网站所刊发、转载的文章,其版权均归原作者所有,如其他媒体、网站或个人从本网下载使用,请在
转载有关文章时务必尊重该文章的著作权,保留本网注明的“稿件来源”,并白负版权等法律责任。

手机扫描二维码访问

文章版权声明:除非注明,否则均为主机测评原创文章,转载或复制请以超链接形式并注明出处。

发表评论

快捷回复: 表情:
评论列表 (暂无评论,6172人围观)

还没有评论,来说两句吧...

目录[+]