Paralized

This commit is contained in:
Sewmina 2025-10-11 23:44:39 +05:30
parent ea9a10b578
commit d2bbbc3f35
2 changed files with 294 additions and 86 deletions

View File

@ -1,17 +1,22 @@
import { createCanvas, loadImage } from "canvas";
import { Worker } from 'worker_threads';
import { readReplay } from "./utils/replay_reader";
import { drawTimeBasedHeatmap, getReplayDuration } from "./utils/drawer";
import { getReplayDuration } from "./utils/drawer";
import * as cliProgress from "cli-progress";
import * as os from 'os';
import * as fs from 'fs';
import * as path from 'path';
const replayPath = "/home/warlock/.var/app/com.unity.UnityHub/config/unity3d/Milk Carton Games/Super Cloudfight/Replays/";
const fs = require("fs");
// Get all JSON file paths from replayPath
const path = require("path");
const replayFiles = fs.readdirSync(replayPath)
.filter((file: string) => file.endsWith(".json"))
.map((file: string) => path.join(replayPath, file));
// Timelapse configuration
const targetFPS = 60;
const timelapseSpeed = 5;
// Create multibar container
const multibar = new cliProgress.MultiBar({
clearOnComplete: false,
@ -19,17 +24,35 @@ const multibar = new cliProgress.MultiBar({
format: '{bar} | {filename} | {view} | {value}/{total} frames | {percentage}%'
}, cliProgress.Presets.shades_classic);
async function processReplayFile(filePath: string) {
interface WorkerTask {
filePath: string;
viewMode: 'top' | 'side' | 'front';
baseFileName: string;
replayDir: string;
targetFPS: number;
timelapseSpeed: number;
totalFrames: number;
startFrame: number;
endFrame: number;
chunkId: number;
}
interface TaskWithBar extends WorkerTask {
progressBar: cliProgress.SingleBar;
}
interface ActiveWorkerInfo {
id: number;
task: TaskWithBar;
promise: Promise<void>;
}
async function prepareTasksForReplay(filePath: string, chunksPerView: number): Promise<WorkerTask[]> {
const replayData = readReplay(filePath);
const baseFileName = path.basename(filePath, '.json');
// Timelapse configuration
const targetFPS = 60; // Output video FPS
const timelapseSpeed = 5; // 5x, 10x, etc.
const framesPerSecond = targetFPS / timelapseSpeed; // Frames to generate per second of real-time
// Get replay duration in seconds
const duration = getReplayDuration(replayData);
const framesPerSecond = targetFPS / timelapseSpeed;
const totalFrames = Math.ceil(duration * framesPerSecond);
// Create main directory for this replay
const replayDir = baseFileName;
@ -37,89 +60,193 @@ async function processReplayFile(filePath: string) {
fs.mkdirSync(replayDir);
}
// Create subdirectories for each view
const views: Array<{mode: 'top' | 'side' | 'front', dirName: string}> = [
{ mode: 'top', dirName: 'top' },
{ mode: 'side', dirName: 'side' },
{ mode: 'front', dirName: 'front' }
];
const views: Array<'top' | 'side' | 'front'> = ['top', 'side', 'front'];
const tasks: WorkerTask[] = [];
// Calculate total number of frames to generate
const totalFrames = Math.ceil(duration * framesPerSecond);
for (const viewMode of views) {
// Split frames into chunks
const framesPerChunk = Math.ceil(totalFrames / chunksPerView);
// Create progress bars for each view
const progressBars = new Map<string, cliProgress.SingleBar>();
for (const { mode } of views) {
const bar = multibar.create(totalFrames, 0, {
filename: baseFileName,
view: mode.padEnd(5, ' ')
});
progressBars.set(mode, bar);
}
for (let chunkId = 0; chunkId < chunksPerView; chunkId++) {
const startFrame = chunkId * framesPerChunk;
const endFrame = Math.min(startFrame + framesPerChunk, totalFrames);
// Reuse a single canvas for all frames to reduce memory allocation
const canvas = createCanvas(1024, 1024);
// Skip empty chunks
if (startFrame >= totalFrames) continue;
for (const { mode, dirName } of views) {
const viewDir = path.join(replayDir, dirName);
if (!fs.existsSync(viewDir)) {
fs.mkdirSync(viewDir);
}
// Get background image path and preload it once
const backgroundImagePath = path.join(__dirname, '..', 'maps', 'hurricane', `${mode}.png`);
let preloadedBackground;
try {
preloadedBackground = await loadImage(backgroundImagePath);
} catch (error) {
console.error(`Failed to preload background image: ${backgroundImagePath}`, error);
}
const progressBar = progressBars.get(mode)!;
// Generate heatmaps at the calculated frame rate
for (let frameNumber = 0; frameNumber < totalFrames; frameNumber++) {
// Calculate the time point for this frame
const timePoint = frameNumber / framesPerSecond;
// Generate time-based heatmap (reusing the same canvas)
await drawTimeBasedHeatmap(canvas, replayData, timePoint, {
width: 200, // Heatmap resolution
height: 200,
worldSize: 1024, // World size in units
sensitivity: 0.7, // Controls sensitivity to lower values (0.1-2.0)
blurAmount: 3, // Controls smoothness of heat areas (1-5)
viewMode: mode, // View mode: 'top', 'side', or 'front'
preloadedBackground: preloadedBackground
tasks.push({
filePath,
viewMode,
baseFileName,
replayDir,
targetFPS,
timelapseSpeed,
totalFrames,
startFrame,
endFrame,
chunkId
});
const outputPath = path.join(viewDir, `${(frameNumber + 1).toString().padStart(3, '0')}.png`);
fs.writeFileSync(outputPath, canvas.toBuffer("image/png"));
// Update progress bar
progressBar.update(frameNumber + 1);
// Allow the event loop to process (enables real-time progress bar updates)
if (frameNumber % 5 === 0) {
await new Promise(resolve => setImmediate(resolve));
}
}
// Force garbage collection hint after each view
if (global.gc) {
global.gc();
}
}
return tasks;
}
// Process all replay files sequentially
(async () => {
for (const filePath of replayFiles) {
await processReplayFile(filePath);
function createWorker(task: TaskWithBar, progressTracker: Map<string, number>): Promise<void> {
return new Promise((resolve, reject) => {
const workerPath = path.join(__dirname, 'worker.js');
const worker = new Worker(workerPath, {
workerData: {
filePath: task.filePath,
viewMode: task.viewMode,
baseFileName: task.baseFileName,
replayDir: task.replayDir,
targetFPS: task.targetFPS,
timelapseSpeed: task.timelapseSpeed,
startFrame: task.startFrame,
endFrame: task.endFrame,
chunkId: task.chunkId
}
});
const key = `${task.baseFileName}:${task.viewMode}`;
worker.on('message', (message) => {
if (message.type === 'progress') {
// Update shared progress tracker
const currentFrame = message.frame;
const currentProgress = progressTracker.get(key) || 0;
if (currentFrame > currentProgress) {
progressTracker.set(key, currentFrame);
task.progressBar.update(currentFrame);
}
} else if (message.type === 'complete') {
resolve();
} else if (message.type === 'error') {
reject(new Error(message.error));
}
});
worker.on('error', reject);
worker.on('exit', (code) => {
if (code !== 0) {
reject(new Error(`Worker stopped with exit code ${code}`));
}
});
});
}
async function processWithWorkerPool(allTasks: WorkerTask[]) {
const numCPUs = os.cpus().length;
const maxWorkers = numCPUs * 1;
console.log(`Using ${numCPUs} CPU cores for parallel processing (max ${maxWorkers} workers)`);
// Create status bar for active workers
const statusBar = multibar.create(maxWorkers, 0, {
filename: 'Active Workers',
view: ''
});
// Group tasks by replay and view to create consolidated progress bars
const progressBars = new Map<string, cliProgress.SingleBar>();
const progressTotals = new Map<string, number>();
const progressCurrent = new Map<string, number>();
// Create one progress bar per view
for (const task of allTasks) {
const key = `${task.baseFileName}:${task.viewMode}`;
if (!progressBars.has(key)) {
const bar = multibar.create(task.totalFrames, 0, {
filename: task.baseFileName,
view: task.viewMode.padEnd(5, ' ')
});
progressBars.set(key, bar);
progressTotals.set(key, task.totalFrames);
progressCurrent.set(key, 0);
}
}
// Stop multibar after all processing is complete
multibar.stop();
// Assign progress bars to tasks
const tasksWithBars: TaskWithBar[] = allTasks.map(task => {
const key = `${task.baseFileName}:${task.viewMode}`;
return { ...task, progressBar: progressBars.get(key)! };
});
// Process tasks in parallel with worker pool
const activeWorkers = new Map<number, ActiveWorkerInfo>();
const progressTracker = new Map<string, number>();
let taskIndex = 0;
let workerId = 0;
const updateStatusBar = () => {
statusBar.update(activeWorkers.size, {
filename: `Active Workers: ${activeWorkers.size}/${maxWorkers}`,
view: Array.from(activeWorkers.values())
.map(w => `#${w.id}:${w.task.viewMode[0]}${w.task.chunkId}`)
.join(' ')
.substring(0, 60)
});
};
while (taskIndex < tasksWithBars.length || activeWorkers.size > 0) {
// Fill up to max workers
while (activeWorkers.size < maxWorkers && taskIndex < tasksWithBars.length) {
const task = tasksWithBars[taskIndex++];
const currentWorkerId = workerId++;
const workerPromise = createWorker(task, progressTracker).then(() => {
activeWorkers.delete(currentWorkerId);
updateStatusBar();
});
activeWorkers.set(currentWorkerId, {
id: currentWorkerId,
task: task,
promise: workerPromise
});
updateStatusBar();
}
// Wait for at least one worker to complete
if (activeWorkers.size > 0) {
await Promise.race(Array.from(activeWorkers.values()).map(w => w.promise));
}
}
statusBar.update(0, { filename: 'All workers completed', view: '' });
}
// Main execution
(async () => {
try {
console.log(`Found ${replayFiles.length} replay file(s)`);
// Calculate chunks per view to maximize worker utilization
const numCPUs = os.cpus().length;
const maxWorkers = numCPUs * 1;
const totalViews = replayFiles.length * 3; // 3 views per replay
const chunksPerView = Math.ceil(maxWorkers / totalViews);
console.log(`Will split each view into ${chunksPerView} chunks to utilize ${maxWorkers} workers`);
const allTasks: WorkerTask[] = [];
// Prepare all tasks
for (const filePath of replayFiles) {
const tasks = await prepareTasksForReplay(filePath, chunksPerView);
allTasks.push(...tasks);
}
console.log(`Total tasks to process: ${allTasks.length} chunks (${replayFiles.length} replays × 3 views × ~${chunksPerView} chunks/view)`);
// Process all tasks in parallel
await processWithWorkerPool(allTasks);
multibar.stop();
console.log('\nAll processing complete!');
} catch (error) {
multibar.stop();
console.error('Error during processing:', error);
process.exit(1);
}
})();

81
src/worker.ts Normal file
View File

@ -0,0 +1,81 @@
import { parentPort, workerData } from 'worker_threads';
import { createCanvas, loadImage } from "canvas";
import { readReplay } from "./utils/replay_reader";
import { drawTimeBasedHeatmap, getReplayDuration } from "./utils/drawer";
import * as path from "path";
import * as fs from "fs";
interface WorkerTask {
filePath: string;
viewMode: 'top' | 'side' | 'front';
baseFileName: string;
replayDir: string;
targetFPS: number;
timelapseSpeed: number;
startFrame: number;
endFrame: number;
chunkId: number;
}
async function processView(task: WorkerTask) {
const replayData = readReplay(task.filePath);
const framesPerSecond = task.targetFPS / task.timelapseSpeed;
// Create view directory
const viewDir = path.join(task.replayDir, task.viewMode);
if (!fs.existsSync(viewDir)) {
fs.mkdirSync(viewDir, { recursive: true });
}
// Reuse a single canvas for all frames
const canvas = createCanvas(1024, 1024);
// Get background image path and preload it
const backgroundImagePath = path.join(__dirname, '..', 'maps', 'hurricane', `${task.viewMode}.png`);
let preloadedBackground;
try {
preloadedBackground = await loadImage(backgroundImagePath);
} catch (error) {
console.error(`Failed to preload background image: ${backgroundImagePath}`, error);
}
// Generate heatmaps for the assigned frame range
for (let frameNumber = task.startFrame; frameNumber < task.endFrame; frameNumber++) {
const timePoint = frameNumber / framesPerSecond;
await drawTimeBasedHeatmap(canvas, replayData, timePoint, {
width: 200,
height: 200,
worldSize: 1024,
sensitivity: 0.7,
blurAmount: 3,
viewMode: task.viewMode,
preloadedBackground: preloadedBackground
});
const outputPath = path.join(viewDir, `${(frameNumber + 1).toString().padStart(3, '0')}.png`);
fs.writeFileSync(outputPath, canvas.toBuffer("image/png"));
// Send progress update
if (parentPort) {
parentPort.postMessage({
type: 'progress',
frame: frameNumber + 1
});
}
}
// Send completion message
if (parentPort) {
parentPort.postMessage({ type: 'complete' });
}
}
if (parentPort && workerData) {
processView(workerData as WorkerTask).catch(error => {
if (parentPort) {
parentPort.postMessage({ type: 'error', error: error.message });
}
});
}