mirror of
https://github.com/pnpm/pnpm.git
synced 2026-04-28 11:01:30 -04:00
## Summary
Adds an opt-in **pnpm agent** server that resolves dependencies server-side and streams only the files missing from the client's content-addressable store.
- **`@pnpm/agent.server`** — multi-process HTTP server (Node.js `cluster`) with SQLite-backed metadata and file caches
- **`@pnpm/agent.client`** — streams an NDJSON response, dispatches worker threads to fetch files while the server is still resolving
- **New config**: `agent` in `pnpm-workspace.yaml` (opt-in)
## How it works
1. Client reads integrity hashes from its local store index
2. Sends `POST /v1/install` with dependencies + store integrities
3. Server resolves the dependency tree using pnpm's `install({ lockfileOnly: true })`, with a SQLite-backed `PackageMetaCache` for fast repeat resolution
4. As each package resolves, a wrapped `storeController.requestPackage` looks up its files and immediately streams digests the client is missing (NDJSON `D` lines)
5. Client reads the stream line by line; digest batches fill up and dispatch worker threads to `POST /v1/files` — file downloads overlap with server-side resolution
6. After resolution, server sends index entries (`I` lines) and lockfile (`L` line)
7. Client writes index entries to store, then runs headless install with a wrapped `fetchPackage` that calls `readPkgFromCafs` with `verifyStoreIntegrity: false` (files are trusted from the agent)
8. `/v1/files` response is gzip-streamed (274MB → ~80MB) — server pipes through `createGzip`, worker pipes through `createGunzip`, parsing and writing files to CAFS as data arrives
## Performance
1351-package project, cold local store, warm server (localhost):
| Scenario | Time |
|----------|------|
| Vanilla pnpm install (cold OS cache) | ~48s |
| Vanilla pnpm install (warm OS cache) | ~34s |
| With pnpm agent (consistent) | **~33s** |
### Key optimizations
1. **SQLite metadata cache** — server-side resolution drops from ~3.4s to ~0.9s
2. **SQLite file store** — consistent read performance regardless of OS file cache state
3. **Streaming `/v1/install`** — file digests stream during resolution, downloads start before resolution finishes
4. **Gzip-streamed `/v1/files`** — whole-stream gzip (274MB → ~80MB), significant savings on remote servers
5. **Worker-thread streaming HTTP** — workers pipe gzip → parse → write to CAFS as data arrives, no buffering
6. **No rehashing** — server-provided digests used directly, skipping 33K SHA-512 computations
7. **No re-verification** — wrapped `fetchPackage` calls `readPkgFromCafs` with `verifyStoreIntegrity: false`
8. **Direct `writeFileSync` with `wx`** — no stat + temp + rename
9. **Pre-packed msgpack** — server sends raw store index buffers, client writes directly to SQLite
10. **WAL checkpoint** — ensures store index entries written by agent are visible to headless install's worker threads
## Usage
Start the server:
```bash
node agent/server/lib/bin.js
```
Configure in `pnpm-workspace.yaml`:
```yaml
agent: http://localhost:4873
```
191 lines
6.4 KiB
TypeScript
191 lines
6.4 KiB
TypeScript
import { promises as fs } from 'node:fs'
|
|
import http from 'node:http'
|
|
import os from 'node:os'
|
|
import path from 'node:path'
|
|
|
|
import { afterAll, beforeAll, describe, expect, it, jest } from '@jest/globals'
|
|
|
|
// First run downloads packages from registry-mock — slow on Windows CI
|
|
jest.setTimeout(600_000)
|
|
import { fetchFromPnpmRegistry } from '@pnpm/agent.client'
|
|
import { REGISTRY_MOCK_PORT } from '@pnpm/registry-mock'
|
|
import { StoreIndex } from '@pnpm/store.index'
|
|
import type { DepPath, ProjectId } from '@pnpm/types'
|
|
import { createRegistryServer } from 'pnpm-agent'
|
|
|
|
const REGISTRY = `http://localhost:${REGISTRY_MOCK_PORT}/`
|
|
|
|
describe('pnpm-agent integration', () => {
|
|
let server: http.Server
|
|
let serverPort: number
|
|
let serverStoreDir: string
|
|
let serverCacheDir: string
|
|
|
|
beforeAll(async () => {
|
|
// Create server store in a temp directory
|
|
const tmpBase = await fs.mkdtemp(path.join(os.tmpdir(), 'pnpm-agent-test-server-'))
|
|
serverStoreDir = path.join(tmpBase, 'store')
|
|
serverCacheDir = path.join(tmpBase, 'cache')
|
|
|
|
server = await createRegistryServer({
|
|
storeDir: serverStoreDir,
|
|
cacheDir: serverCacheDir,
|
|
registries: { default: REGISTRY },
|
|
})
|
|
|
|
// Listen on random port
|
|
await new Promise<void>((resolve) => {
|
|
server.listen(0, resolve)
|
|
})
|
|
serverPort = (server.address() as any).port // eslint-disable-line @typescript-eslint/no-explicit-any
|
|
})
|
|
|
|
afterAll(async () => {
|
|
const { finishWorkers } = await import('../../../worker/src/index.js')
|
|
await finishWorkers()
|
|
await new Promise<void>((resolve, reject) => {
|
|
server.close((err) => {
|
|
if (err) {
|
|
reject(err)
|
|
} else {
|
|
resolve()
|
|
}
|
|
})
|
|
})
|
|
await fs.rm(path.dirname(serverStoreDir), { recursive: true, force: true })
|
|
})
|
|
|
|
it('returns a lockfile with importers keyed by "."', async () => {
|
|
const tmpClient = await fs.mkdtemp(path.join(os.tmpdir(), 'pnpm-agent-test-importers-'))
|
|
const clientStoreDir = path.join(tmpClient, 'store')
|
|
await fs.mkdir(clientStoreDir, { recursive: true })
|
|
const clientStoreIndex = new StoreIndex(clientStoreDir)
|
|
|
|
try {
|
|
const result = await fetchFromPnpmRegistry({
|
|
registryUrl: `http://localhost:${serverPort}`,
|
|
storeDir: clientStoreDir,
|
|
storeIndex: clientStoreIndex,
|
|
dependencies: {
|
|
'is-positive': '1.0.0',
|
|
},
|
|
})
|
|
|
|
// The lockfile must have importers keyed by "." (not by the server's temp dir)
|
|
const importerKeys = Object.keys(result.lockfile.importers)
|
|
expect(importerKeys).toEqual(['.'])
|
|
|
|
// The "." importer must have specifiers and dependencies
|
|
const rootImporter = result.lockfile.importers['.' as ProjectId]
|
|
expect(rootImporter).toBeTruthy()
|
|
expect(rootImporter.specifiers).toBeTruthy()
|
|
expect(rootImporter.dependencies).toBeTruthy()
|
|
expect(rootImporter.dependencies?.['is-positive']).toBeTruthy()
|
|
await result.fileDownloads
|
|
} finally {
|
|
clientStoreIndex.close()
|
|
await fs.rm(tmpClient, { recursive: true, force: true })
|
|
}
|
|
})
|
|
|
|
it('resolves a single dependency and returns lockfile + files', async () => {
|
|
// Create a client store in a temp directory
|
|
const tmpClient = await fs.mkdtemp(path.join(os.tmpdir(), 'pnpm-agent-test-client-'))
|
|
const clientStoreDir = path.join(tmpClient, 'store')
|
|
await fs.mkdir(clientStoreDir, { recursive: true })
|
|
const clientStoreIndex = new StoreIndex(clientStoreDir)
|
|
|
|
try {
|
|
const result = await fetchFromPnpmRegistry({
|
|
registryUrl: `http://localhost:${serverPort}`,
|
|
storeDir: clientStoreDir,
|
|
storeIndex: clientStoreIndex,
|
|
dependencies: {
|
|
'is-positive': '1.0.0',
|
|
},
|
|
})
|
|
|
|
// Verify lockfile was returned
|
|
expect(result.lockfile).toBeTruthy()
|
|
expect(result.lockfile.lockfileVersion).toBeTruthy()
|
|
|
|
// Verify packages were resolved
|
|
const packages = result.lockfile.packages ?? {}
|
|
const depPaths = Object.keys(packages)
|
|
expect(depPaths.length).toBeGreaterThanOrEqual(1)
|
|
|
|
// Verify at least one package has a resolution with integrity
|
|
const hasIntegrity = depPaths.some(dp => {
|
|
const pkg = packages[dp as DepPath]
|
|
return pkg?.resolution && typeof pkg.resolution === 'object' && 'integrity' in pkg.resolution
|
|
})
|
|
expect(hasIntegrity).toBe(true)
|
|
|
|
// Verify stats
|
|
expect(result.stats.totalPackages).toBeGreaterThanOrEqual(1)
|
|
await result.fileDownloads
|
|
} finally {
|
|
clientStoreIndex.close()
|
|
await fs.rm(tmpClient, { recursive: true, force: true })
|
|
}
|
|
})
|
|
|
|
it('returns consistent lockfile on repeated requests', async () => {
|
|
const tmpClient = await fs.mkdtemp(path.join(os.tmpdir(), 'pnpm-agent-test-client2-'))
|
|
const clientStoreDir = path.join(tmpClient, 'store')
|
|
await fs.mkdir(clientStoreDir, { recursive: true })
|
|
const clientStoreIndex = new StoreIndex(clientStoreDir)
|
|
|
|
try {
|
|
const result1 = await fetchFromPnpmRegistry({
|
|
registryUrl: `http://localhost:${serverPort}`,
|
|
storeDir: clientStoreDir,
|
|
storeIndex: clientStoreIndex,
|
|
dependencies: {
|
|
'is-positive': '1.0.0',
|
|
},
|
|
})
|
|
|
|
const result2 = await fetchFromPnpmRegistry({
|
|
registryUrl: `http://localhost:${serverPort}`,
|
|
storeDir: clientStoreDir,
|
|
storeIndex: clientStoreIndex,
|
|
dependencies: {
|
|
'is-positive': '1.0.0',
|
|
},
|
|
})
|
|
|
|
// Same dependency → same lockfile
|
|
expect(Object.keys(result1.lockfile.packages ?? {})).toEqual(
|
|
Object.keys(result2.lockfile.packages ?? {})
|
|
)
|
|
|
|
// Wait for file downloads to complete before cleanup
|
|
await result1.fileDownloads
|
|
await result2.fileDownloads
|
|
} finally {
|
|
clientStoreIndex.close()
|
|
await fs.rm(tmpClient, { recursive: true, force: true })
|
|
}
|
|
})
|
|
|
|
it('returns 404 for unknown endpoints', async () => {
|
|
const result = await new Promise<{ statusCode: number, body: string }>((resolve, reject) => {
|
|
const req = http.request(`http://localhost:${serverPort}/v1/unknown`, {
|
|
method: 'POST',
|
|
}, (res) => {
|
|
const chunks: Buffer[] = []
|
|
res.on('data', (chunk: Buffer) => chunks.push(chunk))
|
|
res.on('end', () => resolve({
|
|
statusCode: res.statusCode!,
|
|
body: Buffer.concat(chunks).toString(),
|
|
}))
|
|
})
|
|
req.on('error', reject)
|
|
req.end()
|
|
})
|
|
|
|
expect(result.statusCode).toBe(404)
|
|
})
|
|
})
|