Merge remote-tracking branch 'origin/master' into max/04-10-web-push-neo

This commit is contained in:
Max Leiter
2026-04-12 13:13:24 -07:00
12 changed files with 327 additions and 915 deletions

View File

@@ -1,8 +1,8 @@
import type {Database} from "sqlite3";
import {DatabaseSync} from "node:sqlite";
import log from "../../log";
import path from "path";
import fs from "fs/promises";
import {mkdirSync} from "fs";
import Config from "../../config";
import Msg, {Message} from "../../models/msg";
import Chan, {Channel} from "../../models/chan";
@@ -11,19 +11,6 @@ import type {SearchableMessageStorage, DeletionRequest} from "./types";
import Network from "../../models/network";
import {SearchQuery, SearchResponse} from "../../../shared/types/storage";
// TODO; type
let sqlite3: any;
try {
sqlite3 = require("sqlite3");
} catch (e: any) {
Config.values.messageStorage = Config.values.messageStorage.filter((item) => item !== "sqlite");
log.error(
"Unable to load sqlite3 module. See https://github.com/mapbox/node-sqlite3/wiki/Binaries"
);
}
type Migration = {version: number; stmts: string[]};
type Rollback = {version: number; rollback_forbidden?: boolean; stmts: string[]};
@@ -103,35 +90,21 @@ export const rollbacks: Rollback[] = [
},
];
class Deferred {
resolve!: () => void;
promise: Promise<void>;
constructor() {
this.promise = new Promise((resolve) => {
this.resolve = resolve;
});
}
}
class SqliteMessageStorage implements SearchableMessageStorage {
isEnabled: boolean;
database!: Database;
initDone: Deferred;
database!: DatabaseSync;
userName: string;
constructor(userName: string) {
this.userName = userName;
this.isEnabled = false;
this.initDone = new Deferred();
}
async _enable(connection_string: string) {
this.database = new sqlite3.Database(connection_string);
_enable(connection_string: string) {
this.database = new DatabaseSync(connection_string);
try {
await this.run_pragmas(); // must be done outside of a transaction
await this.run_migrations();
this.run_migrations();
} catch (e) {
this.isEnabled = false;
throw Helper.catch_to_error("Migration failed", e);
@@ -140,46 +113,35 @@ class SqliteMessageStorage implements SearchableMessageStorage {
this.isEnabled = true;
}
async enable() {
enable() {
const logsPath = Config.getUserLogsPath();
const sqlitePath = path.join(logsPath, `${this.userName}.sqlite3`);
try {
await fs.mkdir(logsPath, {recursive: true});
} catch (e) {
throw Helper.catch_to_error("Unable to create logs directory", e);
}
try {
await this._enable(sqlitePath);
} finally {
this.initDone.resolve(); // unblock the instance methods
}
mkdirSync(logsPath, {recursive: true});
this._enable(sqlitePath);
}
async setup_new_db() {
setup_new_db() {
for (const stmt of schema) {
await this.serialize_run(stmt);
this.database.exec(stmt);
}
await this.serialize_run(
"INSERT INTO options (name, value) VALUES ('schema_version', ?)",
currentSchemaVersion.toString()
);
this.database
.prepare("INSERT INTO options (name, value) VALUES ('schema_version', ?)")
.run(currentSchemaVersion.toString());
}
async current_version(): Promise<number> {
const have_options = await this.serialize_get(
"select 1 from sqlite_master where type = 'table' and name = 'options'"
);
current_version(): number {
const have_options = this.database
.prepare("select 1 from sqlite_master where type = 'table' and name = 'options'")
.get();
if (!have_options) {
return 0;
}
const version = await this.serialize_get(
"SELECT value FROM options WHERE name = 'schema_version'"
);
const version = this.database
.prepare("SELECT value FROM options WHERE name = 'schema_version'")
.get() as {value: string} | undefined;
if (version === undefined) {
// technically shouldn't happen, means something created a schema but didn't populate it
@@ -191,14 +153,13 @@ class SqliteMessageStorage implements SearchableMessageStorage {
return storedSchemaVersion;
}
async update_version_in_db() {
return this.serialize_run(
"UPDATE options SET value = ? WHERE name = 'schema_version'",
currentSchemaVersion.toString()
);
update_version_in_db() {
this.database
.prepare("UPDATE options SET value = ? WHERE name = 'schema_version'")
.run(currentSchemaVersion.toString());
}
async _run_migrations(dbVersion: number) {
_run_migrations(dbVersion: number) {
log.info(
`sqlite messages schema version is out of date (${dbVersion} < ${currentSchemaVersion}). Running migrations.`
);
@@ -206,18 +167,14 @@ class SqliteMessageStorage implements SearchableMessageStorage {
const to_execute = necessaryMigrations(dbVersion);
for (const stmt of to_execute.map((m) => m.stmts).flat()) {
await this.serialize_run(stmt);
this.database.exec(stmt);
}
await this.update_version_in_db();
this.update_version_in_db();
}
async run_pragmas() {
await this.serialize_run("PRAGMA foreign_keys = ON;");
}
async run_migrations() {
const version = await this.current_version();
run_migrations() {
const version = this.current_version();
if (version > currentSchemaVersion) {
throw `sqlite messages schema version is higher than expected (${version} > ${currentSchemaVersion}). Is The Lounge out of date?`;
@@ -225,58 +182,54 @@ class SqliteMessageStorage implements SearchableMessageStorage {
return; // nothing to do
}
await this.serialize_run("BEGIN EXCLUSIVE TRANSACTION");
this.database.exec("BEGIN EXCLUSIVE TRANSACTION");
try {
if (version === 0) {
await this.setup_new_db();
this.setup_new_db();
} else {
await this._run_migrations(version);
this._run_migrations(version);
}
await this.insert_rollback_since(version);
this.insert_rollback_since(version);
} catch (err) {
await this.serialize_run("ROLLBACK");
this.database.exec("ROLLBACK");
throw err;
}
await this.serialize_run("COMMIT");
await this.serialize_run("VACUUM");
this.database.exec("COMMIT");
this.database.exec("VACUUM");
}
// helper method that vacuums the db, meant to be used by migration related cli commands
async vacuum() {
await this.serialize_run("VACUUM");
vacuum() {
this.database.exec("VACUUM");
}
async close() {
close() {
if (!this.isEnabled) {
return;
}
this.isEnabled = false;
return new Promise<void>((resolve, reject) => {
this.database.close((err) => {
if (err) {
reject(`Failed to close sqlite database: ${err.message}`);
return;
}
resolve();
});
});
this.database.close();
}
async fetch_rollbacks(since_version: number) {
const res = await this.serialize_fetchall(
`select version, rollback_forbidden, statement
from rollback_steps
join migrations on migrations.id=rollback_steps.migration_id
where version > ?
order by version desc, step asc`,
since_version
);
fetch_rollbacks(since_version: number): Rollback[] {
const res = this.database
.prepare(
`select version, rollback_forbidden, statement
from rollback_steps
join migrations on migrations.id=rollback_steps.migration_id
where version > ?
order by version desc, step asc`
)
.all(since_version) as {
version: number;
rollback_forbidden: number;
statement: string;
}[];
const result: Rollback[] = [];
// convert to Rollback[]
@@ -298,12 +251,12 @@ class SqliteMessageStorage implements SearchableMessageStorage {
return result;
}
async delete_migrations_older_than(version: number) {
return this.serialize_run("delete from migrations where migrations.version > ?", version);
delete_migrations_older_than(version: number) {
this.database.prepare("delete from migrations where migrations.version > ?").run(version);
}
async _downgrade_to(version: number) {
const _rollbacks = await this.fetch_rollbacks(version);
_downgrade_to(version: number): number {
const _rollbacks = this.fetch_rollbacks(version);
if (_rollbacks.length === 0) {
return version;
@@ -317,72 +270,69 @@ class SqliteMessageStorage implements SearchableMessageStorage {
for (const rollback of _rollbacks) {
for (const stmt of rollback.stmts) {
await this.serialize_run(stmt);
this.database.exec(stmt);
}
}
await this.delete_migrations_older_than(version);
await this.update_version_in_db();
this.delete_migrations_older_than(version);
this.update_version_in_db();
return version;
}
async downgrade_to(version: number) {
downgrade_to(version: number): number {
if (version <= 0) {
throw Error(`${version} is not a valid version to downgrade to`);
}
await this.serialize_run("BEGIN EXCLUSIVE TRANSACTION");
this.database.exec("BEGIN EXCLUSIVE TRANSACTION");
let new_version: number;
try {
new_version = await this._downgrade_to(version);
new_version = this._downgrade_to(version);
} catch (err) {
await this.serialize_run("ROLLBACK");
this.database.exec("ROLLBACK");
throw err;
}
await this.serialize_run("COMMIT");
this.database.exec("COMMIT");
return new_version;
}
async downgrade() {
const res = await this.downgrade_to(currentSchemaVersion);
return res;
downgrade() {
return this.downgrade_to(currentSchemaVersion);
}
async insert_rollback_since(version: number) {
insert_rollback_since(version: number) {
const missing = newRollbacks(version);
for (const rollback of missing) {
const migration = await this.serialize_get(
`insert into migrations
(version, rollback_forbidden)
values (?, ?)
returning id`,
rollback.version,
rollback.rollback_forbidden || 0
);
const migration = this.database
.prepare(
`insert into migrations
(version, rollback_forbidden)
values (?, ?)
returning id`
)
.get(rollback.version, rollback.rollback_forbidden ? 1 : 0) as {id: number};
let step = 0;
for (const stmt of rollback.stmts) {
let step = 0;
await this.serialize_run(
`insert into rollback_steps
(migration_id, step, statement)
values (?, ?, ?)`,
migration.id,
step,
stmt
);
this.database
.prepare(
`insert into rollback_steps
(migration_id, step, statement)
values (?, ?, ?)`
)
.run(migration.id, step, stmt);
step++;
}
}
}
async index(network: Network, channel: Chan, msg: Msg) {
await this.initDone.promise;
index(network: Network, channel: Chan, msg: Msg) {
if (!this.isEnabled) {
return;
}
@@ -398,38 +348,30 @@ class SqliteMessageStorage implements SearchableMessageStorage {
return newMsg;
}, {});
await this.serialize_run(
"INSERT INTO messages(network, channel, time, type, msg) VALUES(?, ?, ?, ?, ?)",
network.uuid,
channel.name.toLowerCase(),
msg.time.getTime(),
msg.type,
JSON.stringify(clonedMsg)
);
this.database
.prepare(
"INSERT INTO messages(network, channel, time, type, msg) VALUES(?, ?, ?, ?, ?)"
)
.run(
network.uuid,
channel.name.toLowerCase(),
msg.time.getTime(),
msg.type,
JSON.stringify(clonedMsg)
);
}
async deleteChannel(network: Network, channel: Channel) {
await this.initDone.promise;
deleteChannel(network: Network, channel: Channel) {
if (!this.isEnabled) {
return;
}
await this.serialize_run(
"DELETE FROM messages WHERE network = ? AND channel = ?",
network.uuid,
channel.name.toLowerCase()
);
this.database
.prepare("DELETE FROM messages WHERE network = ? AND channel = ?")
.run(network.uuid, channel.name.toLowerCase());
}
async getMessages(
network: Network,
channel: Channel,
nextID: () => number
): Promise<Message[]> {
await this.initDone.promise;
getMessages(network: Network, channel: Channel, nextID: () => number): Message[] {
if (!this.isEnabled || Config.values.maxHistory === 0) {
return [];
}
@@ -437,14 +379,17 @@ class SqliteMessageStorage implements SearchableMessageStorage {
// If unlimited history is specified, load 100k messages
const limit = Config.values.maxHistory < 0 ? 100000 : Config.values.maxHistory;
const rows = await this.serialize_fetchall(
"SELECT msg, type, time FROM messages WHERE network = ? AND channel = ? ORDER BY time DESC LIMIT ?",
network.uuid,
channel.name.toLowerCase(),
limit
);
const rows = this.database
.prepare(
"SELECT msg, type, time FROM messages WHERE network = ? AND channel = ? ORDER BY time DESC LIMIT ?"
)
.all(network.uuid, channel.name.toLowerCase(), limit) as {
msg: string;
type: string;
time: number;
}[];
return rows.reverse().map((row: any): Message => {
return rows.reverse().map((row): Message => {
const msg = JSON.parse(row.msg);
msg.time = row.time;
msg.type = row.type;
@@ -456,9 +401,7 @@ class SqliteMessageStorage implements SearchableMessageStorage {
});
}
async search(query: SearchQuery): Promise<SearchResponse> {
await this.initDone.promise;
search(query: SearchQuery): SearchResponse {
if (!this.isEnabled) {
// this should never be hit as messageProvider is checked in client.search()
throw new Error(
@@ -471,7 +414,7 @@ class SqliteMessageStorage implements SearchableMessageStorage {
let select =
"SELECT msg, type, time, network, channel FROM messages WHERE type = 'message' AND json_extract(msg, '$.text') LIKE ? ESCAPE '@'";
const params: any[] = [`%${escapedSearchTerm}%`];
const params: (string | number)[] = [`%${escapedSearchTerm}%`];
if (query.networkUuid) {
select += " AND network = ? ";
@@ -489,15 +432,21 @@ class SqliteMessageStorage implements SearchableMessageStorage {
params.push(maxResults);
params.push(query.offset);
const rows = await this.serialize_fetchall(select, ...params);
const rows = this.database.prepare(select).all(...params) as {
msg: string;
type: string;
time: number;
network: string;
channel: string;
}[];
return {
...query,
results: parseSearchRowsToMessages(query.offset, rows).reverse(),
};
}
async deleteMessages(req: DeletionRequest): Promise<number> {
await this.initDone.promise;
deleteMessages(req: DeletionRequest): number {
let sql = "delete from messages where id in (select id from messages where\n";
// We roughly get a timestamp from N days before.
@@ -520,61 +469,19 @@ class SqliteMessageStorage implements SearchableMessageStorage {
sql += `limit ${req.limit}\n`;
sql += ")";
return this.serialize_run(sql);
return this.database.prepare(sql).run().changes as number;
}
canProvideMessages() {
return this.isEnabled;
}
private serialize_run(stmt: string, ...params: any[]): Promise<number> {
return new Promise((resolve, reject) => {
this.database.serialize(() => {
this.database.run(stmt, params, function (err) {
if (err) {
reject(err);
return;
}
resolve(this.changes); // number of affected rows, `this` is re-bound by sqlite3
});
});
});
}
private serialize_fetchall(stmt: string, ...params: any[]): Promise<any[]> {
return new Promise((resolve, reject) => {
this.database.serialize(() => {
this.database.all(stmt, params, (err, rows) => {
if (err) {
reject(err);
return;
}
resolve(rows);
});
});
});
}
private serialize_get(stmt: string, ...params: any[]): Promise<any> {
return new Promise((resolve, reject) => {
this.database.serialize(() => {
this.database.get(stmt, params, (err, row) => {
if (err) {
reject(err);
return;
}
resolve(row);
});
});
});
}
}
// TODO: type any
function parseSearchRowsToMessages(id: number, rows: any[]) {
function parseSearchRowsToMessages(
id: number,
rows: {msg: string; type: string; time: number; network: string; channel: string}[]
) {
const messages: Msg[] = [];
for (const row of rows) {

View File

@@ -1,5 +1,4 @@
/* eslint-disable @typescript-eslint/restrict-template-expressions */
import fs from "fs/promises";
import {mkdirSync, appendFileSync} from "fs";
import path from "path";
import filenamify from "filenamify";
@@ -19,17 +18,15 @@ class TextFileMessageStorage implements MessageStorage {
this.isEnabled = false;
}
// eslint-disable-next-line @typescript-eslint/require-await
async enable() {
enable() {
this.isEnabled = true;
}
// eslint-disable-next-line @typescript-eslint/require-await
async close() {
close() {
this.isEnabled = false;
}
async index(network: Network, channel: Channel, msg: Message) {
index(network: Network, channel: Channel, msg: Message) {
if (!this.isEnabled) {
return;
}
@@ -40,11 +37,7 @@ class TextFileMessageStorage implements MessageStorage {
TextFileMessageStorage.getNetworkFolderName(network)
);
try {
await fs.mkdir(logPath, {recursive: true});
} catch (e) {
throw new Error(`Unable to create logs directory: ${e}`);
}
mkdirSync(logPath, {recursive: true});
let line = `[${msg.time.toISOString()}] `;
@@ -102,25 +95,21 @@ class TextFileMessageStorage implements MessageStorage {
line += "\n";
try {
await fs.appendFile(
path.join(logPath, TextFileMessageStorage.getChannelFileName(channel)),
line
);
} catch (e) {
throw new Error(`Failed to write user log: ${e}`);
}
appendFileSync(
path.join(logPath, TextFileMessageStorage.getChannelFileName(channel)),
line
);
}
async deleteChannel() {
deleteChannel() {
// Not implemented for text log files
}
getMessages() {
getMessages(): Message[] {
// Not implemented for text log files
// They do not contain enough data to fully re-create message objects
// Use sqlite storage instead
return Promise.resolve([]);
return [];
}
canProvideMessages() {

View File

@@ -1,5 +1,3 @@
import type {Database} from "sqlite3";
import {Channel} from "../../models/channel";
import {Message} from "../../models/message";
import {Network} from "../../models/network";
@@ -16,20 +14,20 @@ export type DeletionRequest = {
interface MessageStorage {
isEnabled: boolean;
enable(): Promise<void>;
enable(): void;
close(): Promise<void>;
close(): void;
index(network: Network, channel: Channel, msg: Message): Promise<void>;
index(network: Network, channel: Channel, msg: Message): void;
deleteChannel(network: Network, channel: Channel): Promise<void>;
deleteChannel(network: Network, channel: Channel): void;
getMessages(network: Network, channel: Channel, nextID: () => number): Promise<Message[]>;
getMessages(network: Network, channel: Channel, nextID: () => number): Message[];
canProvideMessages(): boolean;
}
type SearchFunction = (query: SearchQuery) => Promise<SearchResponse>;
type SearchFunction = (query: SearchQuery) => SearchResponse;
export interface SearchableMessageStorage extends MessageStorage {
search: SearchFunction;