Untitled

 avatar
unknown
javascript
2 years ago
3.1 kB
22
Indexable
import { Client, ClientConfig } from "mysql";
import {
  CompiledQuery,
  DatabaseConnection,
  Driver,
  QueryResult,
  TransactionSettings,
} from "kysely";

type QueryArguments = unknown[];

class ConnectionPool {
  private readonly connections: MySQLConnection[] = [];
  private readonly maxConnections: number;
  private readonly db_conn_info: ClientConfig;

  constructor(db_conn_info: ClientConfig, maxConnections: number) {
    this.db_conn_info = db_conn_info;
    this.maxConnections = maxConnections;
  }

  async acquire(): Promise<MySQLConnection> {
    if (this.connections.length > 0) {
      return this.connections.pop()!;
    }

    const client = await new Client().connect(this.db_conn_info);
    return new MySQLConnection(client);
  }

  release(connection: MySQLConnection): void {
    if (this.connections.length < this.maxConnections) {
      this.connections.push(connection);
    } else {
      connection.close();
    }
  }

  async destroy(): Promise<void> {
    for (const connection of this.connections) {
      connection.close();
    }

    this.connections.length = 0;
  }
}

export class MySQLDriver implements Driver {
  db_conn_info: ClientConfig;
  private readonly connectionPool: ConnectionPool;

  constructor(conn_info: ClientConfig, maxConnections: number = 10) {
    this.db_conn_info = conn_info;
    this.connectionPool = new ConnectionPool(this.db_conn_info, maxConnections);
  }

  async init(): Promise<void> {
    // No need for init() as we use the connection pool
  }

  async acquireConnection(): Promise<DatabaseConnection> {
    return this.connectionPool.acquire();
  }

  async beginTransaction(
    connection: DatabaseConnection,
    _settings: TransactionSettings,
  ): Promise<void> {
    await connection.executeQuery(CompiledQuery.raw("start transaction"));
  }

  async commitTransaction(connection: DatabaseConnection): Promise<void> {
    await connection.executeQuery(CompiledQuery.raw("commit"));
  }

  async rollbackTransaction(connection: DatabaseConnection): Promise<void> {
    await connection.executeQuery(CompiledQuery.raw("rollback"));
  }

  releaseConnection(connection: DatabaseConnection): Promise<void> {
    this.connectionPool.release(connection as MySQLConnection);
    return Promise.resolve();
  }

  destroy(): Promise<void> {
    return this.connectionPool.destroy();
  }
}

class MySQLConnection implements DatabaseConnection {
  readonly #db: Client;

  constructor(c: Client) {
    this.#db = c;
  }

  async executeQuery<R>(compiledQuery: CompiledQuery): Promise<QueryResult<R>> {
    const { sql, parameters } = compiledQuery;

    const rows = await this.#db.query(sql, parameters as QueryArguments);

    return Promise.resolve({
      rows: rows as [],
    });
  }

  async *streamQuery<R>(
    _compiledQuery: CompiledQuery,
    _chunkSize?: number,
  ): AsyncIterableIterator<QueryResult<R>> {
    // Streaming queries are not yet supported by the deno_mysql library.
    throw new Error("Streaming queries are not supported with MySQL driver.");
  }

  close(): void {
    this.#db.close();
  }
}