refactor: rewrite task pool

This commit is contained in:
Gabe
2025-09-03 22:07:49 +08:00
parent 343edcdbad
commit 7dc847fca2

View File

@@ -3,74 +3,132 @@ import { kissLog } from "./log";
/** /**
* 任务池 * 任务池
* @param {*} _interval
* @param {*} _limit
* @param {*} _retryInteral
* @returns
*/ */
const TaskPool = (_interval = 100, _limit = 100, _retryInteral = 1000) => { class TaskPool {
const pool = []; #pool = [];
const maxRetry = 2; // 最大重试次数
let maxCount = _limit; // 最大数量
let curCount = 0; // 当前数量
let interval = _interval; // 间隔时间
let timer = null;
const run = async () => { #maxRetry = 2; // 最大重试次数
// console.log("timer", timer); #retryInterval = 1000; // 重试间隔时间
timer && clearTimeout(timer); #limit; // 最大并发数
timer = setTimeout(run, interval); #interval; // 任务最小启动间隔
if (curCount < maxCount) { #currentConcurrent = 0; // 当前正在执行的任务数
const item = pool.shift(); #lastExecutionTime = 0; // 上一个任务的启动时间
if (item) { #schedulerTimer = null; // 用于调度下一个任务的定时器
curCount++;
const { fn, args, resolve, reject, retry } = item; constructor(
try { interval = DEFAULT_FETCH_INTERVAL,
const res = await fn(args); limit = DEFAULT_FETCH_LIMIT,
resolve(res); retryInterval = 1000
} catch (err) { ) {
kissLog(err, "task"); this.#interval = interval;
if (retry < maxRetry) { this.#limit = limit;
const retryTimer = setTimeout(() => { this.#retryInterval = retryInterval;
clearTimeout(retryTimer); }
pool.push({ fn, args, resolve, reject, retry: retry + 1 });
}, _retryInteral); /**
} else { * 调度器
reject(err); */
} #scheduleNext() {
} finally { if (this.#schedulerTimer) {
curCount--; return;
}
if (this.#currentConcurrent >= this.#limit || this.#pool.length === 0) {
return;
}
const now = Date.now();
const timeSinceLast = now - this.#lastExecutionTime;
const delay = Math.max(0, this.#interval - timeSinceLast);
this.#schedulerTimer = setTimeout(() => {
this.#schedulerTimer = null;
if (this.#currentConcurrent < this.#limit && this.#pool.length > 0) {
const task = this.#pool.shift();
if (task) {
this.#lastExecutionTime = Date.now();
this.#execute(task);
} }
} }
}
};
return { if (this.#pool.length > 0) {
push: async (fn, args) => { this.#scheduleNext();
if (!timer) {
run();
} }
return new Promise((resolve, reject) => { }, delay);
pool.push({ fn, args, resolve, reject, retry: 0 }); }
});
}, /**
update: (_interval = 100, _limit = 100) => { * 执行单个任务
if (_interval >= 0 && _interval <= 5000 && _interval !== interval) { * @param {object} task - 任务对象
interval = _interval; */
async #execute(task) {
this.#currentConcurrent++;
const { fn, args, resolve, reject, retry } = task;
try {
const res = await fn(args);
resolve(res);
} catch (err) {
kissLog(err, "task");
if (retry < this.#maxRetry) {
setTimeout(() => {
this.#pool.unshift({ ...task, retry: retry + 1 }); // unshift 保证重试任务优先
this.#scheduleNext();
}, this.#retryInterval);
} else {
reject(err);
} }
if (_limit >= 1 && _limit <= 100 && _limit !== maxCount) { } finally {
maxCount = _limit; this.#currentConcurrent--;
} this.#scheduleNext();
}, }
clear: () => { }
pool.length = 0;
curCount = 0; /**
timer && clearTimeout(timer); * 向任务池中添加一个新任务
timer = null; * @param {Function} fn - 要执行的异步函数
}, * @param {*} args - 函数的参数
}; * @returns {Promise}
}; */
push(fn, args) {
return new Promise((resolve, reject) => {
this.#pool.push({ fn, args, resolve, reject, retry: 0 });
this.#scheduleNext();
});
}
/**
* 更新任务池的配置
* @param {number} interval - 新的最小任务间隔
* @param {number} limit - 新的最大并发数
*/
update(interval, limit) {
if (interval >= 0) {
this.#interval = interval;
}
if (limit >= 1) {
this.#limit = limit;
}
this.#scheduleNext();
}
/**
* 清空任务池
*/
clear() {
for (const task of this.#pool) {
task.reject("the task pool was cleared");
}
this.#pool.length = 0;
if (this.#schedulerTimer) {
clearTimeout(this.#schedulerTimer);
this.#schedulerTimer = null;
}
}
}
/** /**
* 请求池实例 * 请求池实例
@@ -79,20 +137,19 @@ let fetchPool;
/** /**
* 获取请求池实例 * 获取请求池实例
* @param {*} interval * @param interval
* @param {*} limit * @param limit
* @returns * @returns
*/ */
export const getFetchPool = (interval, limit) => { export const getFetchPool = (interval, limit) => {
if (!fetchPool) { if (!fetchPool) {
fetchPool = TaskPool( fetchPool = new TaskPool(
interval ?? DEFAULT_FETCH_INTERVAL, interval ?? DEFAULT_FETCH_INTERVAL,
limit ?? DEFAULT_FETCH_LIMIT limit ?? DEFAULT_FETCH_LIMIT
); );
} else if (interval && limit) { } else if (interval && limit) {
updateFetchPool(interval, limit); updateFetchPool(interval, limit);
} }
return fetchPool; return fetchPool;
}; };
@@ -102,12 +159,12 @@ export const getFetchPool = (interval, limit) => {
* @param {*} limit * @param {*} limit
*/ */
export const updateFetchPool = (interval, limit) => { export const updateFetchPool = (interval, limit) => {
fetchPool && fetchPool.update(interval, limit); fetchPool?.update(interval, limit);
}; };
/** /**
* 清空请求池 * 清空请求池
*/ */
export const clearFetchPool = () => { export const clearFetchPool = () => {
fetchPool && fetchPool.clear(); fetchPool?.clear();
}; };