Lessons Learned While Boosting aickyway's Image Processing Pipeline Performance
How do places like Instagram and Pinterest process images in an instant? This tutorial covers how to build your own image pipeline capable of handling thousands of images per second by combining Node.js Worker Threads + Sharp + TensorFlow.js + Redis + WebSocket.
## TL;DR
- Separate CPU tasks with Worker Threads to avoid blocking the main thread
- Optimize format, resize, and compression with Sharp
- AI enhancement (sharpening, noise reduction, etc.) with **TensorFlow.js (tfjs-node)**
- State caching with Redis + real-time notifications via Pub/Sub
- Progress streaming with WebSocket
- In production: auto-scaling, monitoring, and rate limiting
## First, a 10-Second Technical Glossary
- Worker Threads: Offloads CPU-heavy work to background threads in Node.js. The main event loop stays unblocked.
- Sharp: An ultra-fast image transformation library. Quickly handles resizing, format conversion (JPEG/WEBP/AVIF), and compression.
- TensorFlow.js (tfjs-node): An AI inference engine that runs in Node.js. Loads models to enhance images.
- Redis: An ultra-fast in-memory data store. Perfect for job state caching and real-time Pub/Sub.
- WebSocket: Bidirectional real-time communication from server to browser. Progress bars respond instantly.
## Architecture at a Glance
Upload → Queuing → Worker Thread (Sharp+AI) → Object Storage Save → Redis State Update/Publish → Client WebSocket Receive

## Project Setup
Node 18+ recommended (global fetch support)
mkdir image-processing-pipeline
cd image-processing-pipeline
npm init -y
# ESM + essential libraries
npm i express sharp @tensorflow/tfjs-node redis ws pino
# (optional) If you need UUID
npm i uuid
Enable ESM in package.json.
{
"type": "module",
"engines": { "node": ">=18" }
}
❗️Note
- It's **
@tensorflow/tfjs-node**, nottensorflow-node.- Browser-only
navigator.hardwareConcurrencydoesn't exist in Node. Use **os.cpus().length**.__dirnamedoesn't exist in ESM. Use **import.meta.url+fileURLToPath**.
## Server Entry: REST API + Redis + WS
// src/app.js
import http from 'node:http'
import express from 'express'
import { createClient } from 'redis'
import { WorkerPool } from './workers/pool.js'
import { setupWebSocket } from './websocket.js'
import os from 'node:os'
import { randomUUID } from 'node:crypto'
const app = express()
app.use(express.json({ limit: '2mb' }))
// Redis client (cache/state)
const redis = createClient({ url: process.env.REDIS_URL || 'redis://localhost:6379' })
await redis.connect()
// Duplicate client for Pub/Sub
const pub = redis.duplicate();
await pub.connect()
// Worker pool sized to CPU core count
const workerPool = new WorkerPool({ size: os.cpus().length, pub, redis })
app.post(, (req, res) => {
{
{ imageUrl, options = {} } = req. || {}
(!imageUrl) res.().({ : })
jobId = ()
key =
redis.(key, { : , : , : .() })
workerPool.queueJob({ jobId, imageUrl, options })
res.json({ jobId })
} catch (e) { console.error(e) res.status(500).json({ error: 'Failed to queue job' }) } })
app.get('/job/:jobId', async (req, res) => {
const key = job:${req.params.jobId}
const status = await redis.hGetAll(key)
if (!status || Object.keys(status).length === 0) return res.status(404).json({ error: 'Job not found' })
res.json(status)
})
const server = http.createServer(app) setupWebSocket(server, { redis }) server.listen(process.env.PORT || 3000, () => console.log('Listening on 3000'))
---
<br>
## Worker Pool: Safe and Resilient
```js
// src/workers/pool.js
import { Worker } from 'node:worker_threads'
import { fileURLToPath } from 'node:url'
import { dirname } from 'node:path'
const __filename = fileURLToPath(import.meta.url)
const __dirname = dirname(__filename)
export class WorkerPool {
constructor({ size, pub, redis }) {
this.pub = pub
this.redis = redis
this.size = size
this.workers = Array.from({ length: size }, () => this.#spawn())
this.queue = []
}
#spawn() {
const worker = new Worker(new URL('./worker.js', import.meta.url))
worker.busy = false
worker.on('message', async (msg) => {
const { jobId, type, data } = msg
const key = `job:${jobId}`
if (type === 'progress') {
await this.redis.hSet(key, { status: 'processing', progress: data.progress })
await this.pub.publish(key, JSON.stringify({ status: 'processing', progress: data.progress }))
}
if (type === 'complete') {
worker.busy = false
await this.redis.hSet(key, {
status: 'completed',
progress: 100,
resultUrl: data.url,
width: data.metadata.width,
height: data.metadata.height,
channels: data.metadata.channels,
completedAt: Date.now()
})
await this.pub.publish(key, JSON.stringify({ status: 'completed', progress: 100, resultUrl: data.url }))
this.#drain()
}
if (type === 'error') {
worker.busy = false
await this.redis.hSet(key, { status: 'error', error: data.error, completedAt: Date.now() })
await this.pub.publish(key, JSON.stringify({ status: 'error', error: data.error }))
this.#drain()
}
})
worker.on('error', (err) => {
worker.busy = false
console.error('[worker error]', err)
// Add restart logic if needed
this.workers.push(this.#spawn())
this.#drain()
})
return worker
}
queueJob(job) {
this.queue.push(job)
this.#drain()
}
#drain() {
const idle = this.workers.find(w => !w.busy)
if (!idle || this.queue.length === 0) return
const job = this.queue.shift()
idle.busy = true
idle.postMessage(job)
}
}
## Worker: AI Enhancement with Sharp + tfjs-node
// src/workers/worker.js
{ parentPort }
sharp
* tf
model = tf.()
() {
}
parentPort.(, ({ jobId, imageUrl, options }) => {
{
res = (imageUrl, { : .() })
(!res.) ()
arr = res.()
inputBuffer = .(arr)
parentPort.({ jobId, : , : { : } })
maxW = options. ??
maxH = options. ??
quality = options. ??
pipeline = (inputBuffer)
.()
.()
.(maxW, maxH, { : , : })
format = options. ??
(format === ) pipeline = pipeline.({ quality, : })
(format === ) pipeline = pipeline.({ quality, : })
pipeline = pipeline.({ quality, : })
const optimized = await pipeline.toBuffer()
parentPort.postMessage({ jobId, type: 'progress', data: { progress: 50 } })
// 3) AI enhancement with tfjs-node
const img = tf.node.decodeImage(optimized, 3)
const input = img.expandDims(0).toFloat().div(255)
const pred = model.predict(input)
const out = pred.squeeze().mul(255).clipByValue(0, 255).toInt()
const finalBuffer = await tf.node.encodePng(out)
tf.dispose([img, input, pred, out])
parentPort.postMessage({ jobId, type: 'progress', data: { progress: 90 } })
// 4) Upload to storage
const url = await uploadToStorage(finalBuffer)
parentPort.postMessage({
jobId,
type: 'complete',
data: {
url,
metadata: { width: 0, height: 0, channels: 3 } // Fill with sharp.metadata if needed
}
})
} catch (e) { parentPort.postMessage({ jobId, type: 'error', data: { error: e.message } }) } })

---
<br>
## Real-Time Progress: Redis Pub/Sub + WebSocket
```js
// src/websocket.js
import WebSocket, { WebSocketServer } from 'ws'
export function setupWebSocket(server, { redis }) {
const wss = new WebSocketServer({ server })
wss.on('connection', async (ws) => {
let subscribedKey = null
ws.on('message', async (raw) => {
try {
const { type, jobId } = JSON.parse(raw)
const key = `job:${jobId}`
if (type === 'subscribe') {
subscribedKey = key
const status = await redis.hGetAll(key)
ws.send(JSON.stringify({ jobId, status }))
// Redis Pub/Sub subscription
const sub = redis.duplicate(); await sub.connect()
await sub.subscribe(key, (message) => ws.readyState === 1 && ws.send(JSON.stringify({ jobId, status: JSON.parse(message) })))
ws.on('close', async () => { try { await sub.unsubscribe(key); await sub.quit() } catch (_) {} })
}
} catch (e) {
ws.send(JSON.stringify({ error: e.message }))
}
})
})
}

## Performance Tips (Production-Ready Checklist)
- Batching: Batch jobs with the same options to a single worker to benefit from disk/model load caching.
- Memory Safeguards: Block risks with
sharp.limitInputPixels, max upload size, andAbortSignal.timeout. - Backpressure: When queue length grows, apply rate limiting (HTTP 429) or spool to temporary storage.
- On-Demand Scaling: Adjust worker count based on queue length/processing time.








