Lessons Learned While Boosting aickyway's Image Processing Pipeline Performance

How do places like Instagram and Pinterest process images in an instant? This tutorial covers how to build your own image pipeline capable of handling thousands of images per second by combining Node.js Worker Threads + Sharp + TensorFlow.js + Redis + WebSocket.



## TL;DR
  • Separate CPU tasks with Worker Threads to avoid blocking the main thread
  • Optimize format, resize, and compression with Sharp
  • AI enhancement (sharpening, noise reduction, etc.) with **TensorFlow.js (tfjs-node)**
  • State caching with Redis + real-time notifications via Pub/Sub
  • Progress streaming with WebSocket
  • In production: auto-scaling, monitoring, and rate limiting


## First, a 10-Second Technical Glossary
  • Worker Threads: Offloads CPU-heavy work to background threads in Node.js. The main event loop stays unblocked.
  • Sharp: An ultra-fast image transformation library. Quickly handles resizing, format conversion (JPEG/WEBP/AVIF), and compression.
  • TensorFlow.js (tfjs-node): An AI inference engine that runs in Node.js. Loads models to enhance images.
  • Redis: An ultra-fast in-memory data store. Perfect for job state caching and real-time Pub/Sub.
  • WebSocket: Bidirectional real-time communication from server to browser. Progress bars respond instantly.


## Architecture at a Glance

Upload → Queuing → Worker Thread (Sharp+AI) → Object Storage Save → Redis State Update/Publish → Client WebSocket Receive



## Project Setup

Node 18+ recommended (global fetch support)

mkdir image-processing-pipeline
cd image-processing-pipeline
npm init -y
# ESM + essential libraries
npm i express sharp @tensorflow/tfjs-node redis ws pino
# (optional) If you need UUID
npm i uuid

Enable ESM in package.json.

{
  "type": "module",
  "engines": { "node": ">=18" }
}

❗️Note

  • It's **@tensorflow/tfjs-node**, not tensorflow-node.
  • Browser-only navigator.hardwareConcurrency doesn't exist in Node. Use **os.cpus().length**.
  • __dirname doesn't exist in ESM. Use **import.meta.url + fileURLToPath**.


## Server Entry: REST API + Redis + WS
// src/app.js
import http from 'node:http'
import express from 'express'
import { createClient } from 'redis'
import { WorkerPool } from './workers/pool.js'
import { setupWebSocket } from './websocket.js'
import os from 'node:os'
import { randomUUID } from 'node:crypto'

const app = express()
app.use(express.json({ limit: '2mb' }))

// Redis client (cache/state)
const redis = createClient({ url: process.env.REDIS_URL || 'redis://localhost:6379' })
await redis.connect()
// Duplicate client for Pub/Sub
const pub = redis.duplicate();
await pub.connect()

// Worker pool sized to CPU core count
const workerPool = new WorkerPool({ size: os.cpus().length, pub, redis })

app.post(,  (req, res) => {
   {
     { imageUrl, options = {} } = req. || {}
     (!imageUrl)  res.().({ :  })

     jobId = ()
     key = 

     redis.(key, { : , : , : .() })
workerPool.queueJob({ jobId, imageUrl, options })
res.json({ jobId })

} catch (e) { console.error(e) res.status(500).json({ error: 'Failed to queue job' }) } })

app.get('/job/:jobId', async (req, res) => { const key = job:${req.params.jobId} const status = await redis.hGetAll(key) if (!status || Object.keys(status).length === 0) return res.status(404).json({ error: 'Job not found' }) res.json(status) })

const server = http.createServer(app) setupWebSocket(server, { redis }) server.listen(process.env.PORT || 3000, () => console.log('Listening on 3000'))


---
<br>
## Worker Pool: Safe and Resilient

```js
// src/workers/pool.js
import { Worker } from 'node:worker_threads'
import { fileURLToPath } from 'node:url'
import { dirname } from 'node:path'

const __filename = fileURLToPath(import.meta.url)
const __dirname = dirname(__filename)

export class WorkerPool {
  constructor({ size, pub, redis }) {
    this.pub = pub
    this.redis = redis
    this.size = size
    this.workers = Array.from({ length: size }, () => this.#spawn())
    this.queue = []
  }

  #spawn() {
    const worker = new Worker(new URL('./worker.js', import.meta.url))
    worker.busy = false

    worker.on('message', async (msg) => {
      const { jobId, type, data } = msg
      const key = `job:${jobId}`
      if (type === 'progress') {
        await this.redis.hSet(key, { status: 'processing', progress: data.progress })
        await this.pub.publish(key, JSON.stringify({ status: 'processing', progress: data.progress }))
      }
      if (type === 'complete') {
        worker.busy = false
        await this.redis.hSet(key, {
          status: 'completed',
          progress: 100,
          resultUrl: data.url,
          width: data.metadata.width,
          height: data.metadata.height,
          channels: data.metadata.channels,
          completedAt: Date.now()
        })
        await this.pub.publish(key, JSON.stringify({ status: 'completed', progress: 100, resultUrl: data.url }))
        this.#drain()
      }
      if (type === 'error') {
        worker.busy = false
        await this.redis.hSet(key, { status: 'error', error: data.error, completedAt: Date.now() })
        await this.pub.publish(key, JSON.stringify({ status: 'error', error: data.error }))
        this.#drain()
      }
    })

    worker.on('error', (err) => {
      worker.busy = false
      console.error('[worker error]', err)
      // Add restart logic if needed
      this.workers.push(this.#spawn())
      this.#drain()
    })

    return worker
  }

  queueJob(job) {
    this.queue.push(job)
    this.#drain()
  }

  #drain() {
    const idle = this.workers.find(w => !w.busy)
    if (!idle || this.queue.length === 0) return
    const job = this.queue.shift()
    idle.busy = true
    idle.postMessage(job)
  }
}


## Worker: AI Enhancement with Sharp + tfjs-node
// src/workers/worker.js
 { parentPort }  
 sharp  
 *  tf  


 model =  tf.()

  () {
  
  
   
}

parentPort.(,  ({ jobId, imageUrl, options }) => {
   {
    
     res =  (imageUrl, { : .() })
     (!res.)   ()
     arr =  res.()
     inputBuffer = .(arr)

    parentPort.({ jobId, : , : { :  } })

    
     maxW = options. ?? 
     maxH = options. ?? 
     quality = options. ?? 

     pipeline = (inputBuffer)
      .() 
      .() 
      .(maxW, maxH, { : , :  })

    
     format = options. ?? 
     (format === ) pipeline = pipeline.({ quality, :  })
      (format === ) pipeline = pipeline.({ quality, :  })
     pipeline = pipeline.({ quality, :  })
const optimized = await pipeline.toBuffer()

parentPort.postMessage({ jobId, type: 'progress', data: { progress: 50 } })

// 3) AI enhancement with tfjs-node
const img = tf.node.decodeImage(optimized, 3)
const input = img.expandDims(0).toFloat().div(255)
const pred = model.predict(input)
const out = pred.squeeze().mul(255).clipByValue(0, 255).toInt()
const finalBuffer = await tf.node.encodePng(out)
tf.dispose([img, input, pred, out])

parentPort.postMessage({ jobId, type: 'progress', data: { progress: 90 } })

// 4) Upload to storage
const url = await uploadToStorage(finalBuffer)

parentPort.postMessage({
  jobId,
  type: 'complete',
  data: {
    url,
    metadata: { width: 0, height: 0, channels: 3 } // Fill with sharp.metadata if needed
  }
})

} catch (e) { parentPort.postMessage({ jobId, type: 'error', data: { error: e.message } }) } })


 ![](https://storage.aickyway.com/api/files/post-images/2025/09/02/ba57b19d54d14420b783f812fc332103_ChatGPTImage20259212_53_16.webp)

---
<br>
## Real-Time Progress: Redis Pub/Sub + WebSocket

```js
// src/websocket.js
import WebSocket, { WebSocketServer } from 'ws'

export function setupWebSocket(server, { redis }) {
  const wss = new WebSocketServer({ server })
  wss.on('connection', async (ws) => {
    let subscribedKey = null

    ws.on('message', async (raw) => {
      try {
        const { type, jobId } = JSON.parse(raw)
        const key = `job:${jobId}`
        if (type === 'subscribe') {
          subscribedKey = key
          const status = await redis.hGetAll(key)
          ws.send(JSON.stringify({ jobId, status }))
          // Redis Pub/Sub subscription
          const sub = redis.duplicate(); await sub.connect()
          await sub.subscribe(key, (message) => ws.readyState === 1 && ws.send(JSON.stringify({ jobId, status: JSON.parse(message) })))
          ws.on('close', async () => { try { await sub.unsubscribe(key); await sub.quit() } catch (_) {} })
        }
      } catch (e) {
        ws.send(JSON.stringify({ error: e.message }))
      }
    })
  })
}

Minimal UI mockup of a real-time progress bar updating via WebSocket messages, with small Redis and WebSocket icons. Clean dashboard style.



## Performance Tips (Production-Ready Checklist)
  • Batching: Batch jobs with the same options to a single worker to benefit from disk/model load caching.
  • Memory Safeguards: Block risks with sharp.limitInputPixels, max upload size, and AbortSignal.timeout.
  • Backpressure: When queue length grows, apply rate limiting (HTTP 429) or spool to temporary storage.
  • On-Demand Scaling: Adjust worker count based on queue length/processing time.
## Quick Benchmark Script ## Production Checklist