Untitled

 avatar
unknown
javascript
2 years ago
2.6 kB
98
Indexable
import { Client, ClientConfig } from "mysql";
import {
  CompiledQuery,
  DatabaseConnection,
  Driver,
  QueryResult,
  TransactionSettings,
} from "kysely";

type QueryArguments = unknown[];

export class MySQLDriver implements Driver {
  readonly #connectionMutex = new ConnectionMutex();

  #client?: Client;
  #connection?: DatabaseConnection;

  db_conn_info: ClientConfig; // MySQL connection info

  constructor(conn_info: ClientConfig) {
    this.db_conn_info = conn_info;
  }

  async init(): Promise<void> {
    this.#client = await new Client().connect(this.db_conn_info);
    this.#connection = new MySQLConnection(this.#client);
  }

  async acquireConnection(): Promise<DatabaseConnection> {
    await this.#connectionMutex.lock();
    return this.#connection!;
  }

  async beginTransaction(
    connection: DatabaseConnection,
    _settings: TransactionSettings,
  ): Promise<void> {
    await connection.executeQuery(CompiledQuery.raw("start transaction"));
  }

  async commitTransaction(connection: DatabaseConnection): Promise<void> {
    await connection.executeQuery(CompiledQuery.raw("commit"));
  }

  async rollbackTransaction(connection: DatabaseConnection): Promise<void> {
    await connection.executeQuery(CompiledQuery.raw("rollback"));
  }

  releaseConnection(): Promise<void> {
    this.#connectionMutex.unlock();
    return Promise.resolve();
  }

  destroy(): Promise<void> {
    this.#client?.close();
    return Promise.resolve();
  }
}

class MySQLConnection implements DatabaseConnection {
  readonly #db: Client;

  constructor(c: Client) {
    this.#db = c;
  }

  async executeQuery<R>(compiledQuery: CompiledQuery): Promise<QueryResult<R>> {
    const { sql, parameters } = compiledQuery;

    const rows = await this.#db.query(sql, parameters as QueryArguments);

    return Promise.resolve({
      rows: rows as [],
    });
  }

  async *streamQuery<R>(
    _compiledQuery: CompiledQuery,
    _chunkSize?: number,
  ): AsyncIterableIterator<QueryResult<R>> {
    // Streaming queries are not yet supported by the deno_mysql library.
    throw new Error("Streaming queries are not supported with MySQL driver.");
  }
}

class ConnectionMutex {
  #promise?: Promise<void>;
  #resolve?: () => void;

  async lock(): Promise<void> {
    while (this.#promise) {
      await this.#promise;
    }

    this.#promise = new Promise((resolve) => {
      this.#resolve = resolve;
    });
  }

  unlock(): void {
    const resolve = this.#resolve;

    this.#promise = undefined;
    this.#resolve = undefined;

    resolve?.();
  }
}
Editor is loading...