import { Client, ClientConfig } from "mysql";
import {
CompiledQuery,
DatabaseConnection,
Driver,
QueryResult,
TransactionSettings,
} from "kysely";
type QueryArguments = unknown[];
class ConnectionPool {
private readonly connections: MySQLConnection[] = [];
private readonly maxConnections: number;
private readonly db_conn_info: ClientConfig;
constructor(db_conn_info: ClientConfig, maxConnections: number) {
this.db_conn_info = db_conn_info;
this.maxConnections = maxConnections;
}
async acquire(): Promise<MySQLConnection> {
if (this.connections.length > 0) {
return this.connections.pop()!;
}
const client = await new Client().connect(this.db_conn_info);
return new MySQLConnection(client);
}
release(connection: MySQLConnection): void {
if (this.connections.length < this.maxConnections) {
this.connections.push(connection);
} else {
connection.close();
}
}
async destroy(): Promise<void> {
for (const connection of this.connections) {
connection.close();
}
this.connections.length = 0;
}
}
export class MySQLDriver implements Driver {
db_conn_info: ClientConfig;
private readonly connectionPool: ConnectionPool;
constructor(conn_info: ClientConfig, maxConnections: number = 10) {
this.db_conn_info = conn_info;
this.connectionPool = new ConnectionPool(this.db_conn_info, maxConnections);
}
async init(): Promise<void> {
// No need for init() as we use the connection pool
}
async acquireConnection(): Promise<DatabaseConnection> {
return this.connectionPool.acquire();
}
async beginTransaction(
connection: DatabaseConnection,
_settings: TransactionSettings,
): Promise<void> {
await connection.executeQuery(CompiledQuery.raw("start transaction"));
}
async commitTransaction(connection: DatabaseConnection): Promise<void> {
await connection.executeQuery(CompiledQuery.raw("commit"));
}
async rollbackTransaction(connection: DatabaseConnection): Promise<void> {
await connection.executeQuery(CompiledQuery.raw("rollback"));
}
releaseConnection(connection: DatabaseConnection): Promise<void> {
this.connectionPool.release(connection as MySQLConnection);
return Promise.resolve();
}
destroy(): Promise<void> {
return this.connectionPool.destroy();
}
}
class MySQLConnection implements DatabaseConnection {
readonly #db: Client;
constructor(c: Client) {
this.#db = c;
}
async executeQuery<R>(compiledQuery: CompiledQuery): Promise<QueryResult<R>> {
const { sql, parameters } = compiledQuery;
const rows = await this.#db.query(sql, parameters as QueryArguments);
return Promise.resolve({
rows: rows as [],
});
}
async *streamQuery<R>(
_compiledQuery: CompiledQuery,
_chunkSize?: number,
): AsyncIterableIterator<QueryResult<R>> {
// Streaming queries are not yet supported by the deno_mysql library.
throw new Error("Streaming queries are not supported with MySQL driver.");
}
close(): void {
this.#db.close();
}
}