fix: batch queue

This commit is contained in:
Gabe
2025-10-10 23:07:18 +08:00
parent 001f04a9ee
commit 951ce985b5
3 changed files with 15 additions and 13 deletions

View File

@@ -16,15 +16,14 @@ const BatchQueue = (
batchInterval = DEFAULT_BATCH_INTERVAL,
batchSize = DEFAULT_BATCH_SIZE,
batchLength = DEFAULT_BATCH_LENGTH,
...args
} = {}
) => {
const queue = [];
let isProcessing = false;
let timer = null;
const sendBatchRequest = async (payloads) => {
return taskFn(payloads, args);
const sendBatchRequest = async (payloads, batchArgs) => {
return taskFn(payloads, batchArgs);
};
const processQueue = async () => {
@@ -66,7 +65,8 @@ const BatchQueue = (
try {
const payloads = tasksToProcess.map((item) => item.payload);
const responses = await sendBatchRequest(payloads);
const batchArgs = tasksToProcess[0].args;
const responses = await sendBatchRequest(payloads, batchArgs);
if (!Array.isArray(responses)) {
throw new Error("responses format error");
}
@@ -99,10 +99,10 @@ const BatchQueue = (
}
};
const addTask = (data) => {
const addTask = (data, args) => {
return new Promise((resolve, reject) => {
const payload = data;
queue.push({ payload, resolve, reject });
queue.push({ payload, resolve, reject, args });
if (queue.length >= batchSize) {
processQueue();
@@ -132,12 +132,12 @@ const queueMap = new Map();
/**
* 获取批处理实例
*/
export const getBatchQueue = (key, taskFn, args) => {
export const getBatchQueue = (key, taskFn, options) => {
if (queueMap.has(key)) {
return queueMap.get(key);
}
const queue = BatchQueue(taskFn, args);
const queue = BatchQueue(taskFn, options);
queueMap.set(key, queue);
return queue;
};