From 04e5297d2ea5b45b32e01edaff97a7bd29ba6229 Mon Sep 17 00:00:00 2001 From: "Brian M. Carlson" Date: Wed, 15 Jul 2020 10:33:33 -0500 Subject: [PATCH 01/12] Convert more things to ES6 classes --- packages/pg/lib/client.js | 939 ++++++++++++----------- packages/pg/lib/connection-parameters.js | 206 ++--- packages/pg/lib/result.js | 215 ++++-- 3 files changed, 720 insertions(+), 640 deletions(-) diff --git a/packages/pg/lib/client.js b/packages/pg/lib/client.js index 93dfc6c9c..fd9ecad19 100644 --- a/packages/pg/lib/client.js +++ b/packages/pg/lib/client.js @@ -19,576 +19,579 @@ var Query = require('./query') var defaults = require('./defaults') var Connection = require('./connection') -var Client = function (config) { - EventEmitter.call(this) - - this.connectionParameters = new ConnectionParameters(config) - this.user = this.connectionParameters.user - this.database = this.connectionParameters.database - this.port = this.connectionParameters.port - this.host = this.connectionParameters.host - - // "hiding" the password so it doesn't show up in stack traces - // or if the client is console.logged - Object.defineProperty(this, 'password', { - configurable: true, - enumerable: false, - writable: true, - value: this.connectionParameters.password, - }) - - this.replication = this.connectionParameters.replication - - var c = config || {} - - this._Promise = c.Promise || global.Promise - this._types = new TypeOverrides(c.types) - this._ending = false - this._connecting = false - this._connected = false - this._connectionError = false - this._queryable = true - - this.connection = - c.connection || - new Connection({ - stream: c.stream, - ssl: this.connectionParameters.ssl, - keepAlive: c.keepAlive || false, - keepAliveInitialDelayMillis: c.keepAliveInitialDelayMillis || 0, - encoding: this.connectionParameters.client_encoding || 'utf8', +class Client extends EventEmitter { + constructor(config) { + super() + + this.connectionParameters = new ConnectionParameters(config) + this.user = this.connectionParameters.user + this.database = this.connectionParameters.database + this.port = this.connectionParameters.port + this.host = this.connectionParameters.host + + // "hiding" the password so it doesn't show up in stack traces + // or if the client is console.logged + Object.defineProperty(this, 'password', { + configurable: true, + enumerable: false, + writable: true, + value: this.connectionParameters.password, }) - this.queryQueue = [] - this.binary = c.binary || defaults.binary - this.processID = null - this.secretKey = null - this.ssl = this.connectionParameters.ssl || false - this._connectionTimeoutMillis = c.connectionTimeoutMillis || 0 -} - -util.inherits(Client, EventEmitter) -Client.prototype._errorAllQueries = function (err) { - const enqueueError = (query) => { - process.nextTick(() => { - query.handleError(err, this.connection) - }) + this.replication = this.connectionParameters.replication + + var c = config || {} + + this._Promise = c.Promise || global.Promise + this._types = new TypeOverrides(c.types) + this._ending = false + this._connecting = false + this._connected = false + this._connectionError = false + this._queryable = true + + this.connection = + c.connection || + new Connection({ + stream: c.stream, + ssl: this.connectionParameters.ssl, + keepAlive: c.keepAlive || false, + keepAliveInitialDelayMillis: c.keepAliveInitialDelayMillis || 0, + encoding: this.connectionParameters.client_encoding || 'utf8', + }) + this.queryQueue = [] + this.binary = c.binary || defaults.binary + this.processID = null + this.secretKey = null + this.ssl = this.connectionParameters.ssl || false + this._connectionTimeoutMillis = c.connectionTimeoutMillis || 0 } - if (this.activeQuery) { - enqueueError(this.activeQuery) - this.activeQuery = null - } + _errorAllQueries(err) { + const enqueueError = (query) => { + process.nextTick(() => { + query.handleError(err, this.connection) + }) + } - this.queryQueue.forEach(enqueueError) - this.queryQueue.length = 0 -} + if (this.activeQuery) { + enqueueError(this.activeQuery) + this.activeQuery = null + } -Client.prototype._connect = function (callback) { - var self = this - var con = this.connection - if (this._connecting || this._connected) { - const err = new Error('Client has already been connected. You cannot reuse a client.') - process.nextTick(() => { - callback(err) - }) - return - } - this._connecting = true - - var connectionTimeoutHandle - if (this._connectionTimeoutMillis > 0) { - connectionTimeoutHandle = setTimeout(() => { - con._ending = true - con.stream.destroy(new Error('timeout expired')) - }, this._connectionTimeoutMillis) + this.queryQueue.forEach(enqueueError) + this.queryQueue.length = 0 } - if (this.host && this.host.indexOf('/') === 0) { - con.connect(this.host + '/.s.PGSQL.' + this.port) - } else { - con.connect(this.port, this.host) - } + _connect(callback) { + var self = this + var con = this.connection + if (this._connecting || this._connected) { + const err = new Error('Client has already been connected. You cannot reuse a client.') + process.nextTick(() => { + callback(err) + }) + return + } + this._connecting = true + + var connectionTimeoutHandle + if (this._connectionTimeoutMillis > 0) { + connectionTimeoutHandle = setTimeout(() => { + con._ending = true + con.stream.destroy(new Error('timeout expired')) + }, this._connectionTimeoutMillis) + } - // once connection is established send startup message - con.on('connect', function () { - if (self.ssl) { - con.requestSsl() + if (this.host && this.host.indexOf('/') === 0) { + con.connect(this.host + '/.s.PGSQL.' + this.port) } else { - con.startup(self.getStartupConf()) + con.connect(this.port, this.host) } - }) - - con.on('sslconnect', function () { - con.startup(self.getStartupConf()) - }) - - function checkPgPass(cb) { - return function (msg) { - if (typeof self.password === 'function') { - self._Promise - .resolve() - .then(() => self.password()) - .then((pass) => { - if (pass !== undefined) { - if (typeof pass !== 'string') { - con.emit('error', new TypeError('Password must be a string')) - return + + // once connection is established send startup message + con.on('connect', function () { + if (self.ssl) { + con.requestSsl() + } else { + con.startup(self.getStartupConf()) + } + }) + + con.on('sslconnect', function () { + con.startup(self.getStartupConf()) + }) + + function checkPgPass(cb) { + return function (msg) { + if (typeof self.password === 'function') { + self._Promise + .resolve() + .then(() => self.password()) + .then((pass) => { + if (pass !== undefined) { + if (typeof pass !== 'string') { + con.emit('error', new TypeError('Password must be a string')) + return + } + self.connectionParameters.password = self.password = pass + } else { + self.connectionParameters.password = self.password = null } + cb(msg) + }) + .catch((err) => { + con.emit('error', err) + }) + } else if (self.password !== null) { + cb(msg) + } else { + pgPass(self.connectionParameters, function (pass) { + if (undefined !== pass) { self.connectionParameters.password = self.password = pass - } else { - self.connectionParameters.password = self.password = null } cb(msg) }) - .catch((err) => { - con.emit('error', err) - }) - } else if (self.password !== null) { - cb(msg) - } else { - pgPass(self.connectionParameters, function (pass) { - if (undefined !== pass) { - self.connectionParameters.password = self.password = pass - } - cb(msg) - }) + } } } - } - // password request handling - con.on( - 'authenticationCleartextPassword', - checkPgPass(function () { - con.password(self.password) - }) - ) + // password request handling + con.on( + 'authenticationCleartextPassword', + checkPgPass(function () { + con.password(self.password) + }) + ) - // password request handling - con.on( - 'authenticationMD5Password', - checkPgPass(function (msg) { - con.password(utils.postgresMd5PasswordHash(self.user, self.password, msg.salt)) - }) - ) + // password request handling + con.on( + 'authenticationMD5Password', + checkPgPass(function (msg) { + con.password(utils.postgresMd5PasswordHash(self.user, self.password, msg.salt)) + }) + ) - // password request handling (SASL) - var saslSession - con.on( - 'authenticationSASL', - checkPgPass(function (msg) { - saslSession = sasl.startSession(msg.mechanisms) + // password request handling (SASL) + var saslSession + con.on( + 'authenticationSASL', + checkPgPass(function (msg) { + saslSession = sasl.startSession(msg.mechanisms) - con.sendSASLInitialResponseMessage(saslSession.mechanism, saslSession.response) - }) - ) + con.sendSASLInitialResponseMessage(saslSession.mechanism, saslSession.response) + }) + ) - // password request handling (SASL) - con.on('authenticationSASLContinue', function (msg) { - sasl.continueSession(saslSession, self.password, msg.data) + // password request handling (SASL) + con.on('authenticationSASLContinue', function (msg) { + sasl.continueSession(saslSession, self.password, msg.data) - con.sendSCRAMClientFinalMessage(saslSession.response) - }) + con.sendSCRAMClientFinalMessage(saslSession.response) + }) - // password request handling (SASL) - con.on('authenticationSASLFinal', function (msg) { - sasl.finalizeSession(saslSession, msg.data) + // password request handling (SASL) + con.on('authenticationSASLFinal', function (msg) { + sasl.finalizeSession(saslSession, msg.data) - saslSession = null - }) + saslSession = null + }) - con.once('backendKeyData', function (msg) { - self.processID = msg.processID - self.secretKey = msg.secretKey - }) + con.once('backendKeyData', function (msg) { + self.processID = msg.processID + self.secretKey = msg.secretKey + }) - const connectingErrorHandler = (err) => { - if (this._connectionError) { - return + const connectingErrorHandler = (err) => { + if (this._connectionError) { + return + } + this._connectionError = true + clearTimeout(connectionTimeoutHandle) + if (callback) { + return callback(err) + } + this.emit('error', err) } - this._connectionError = true - clearTimeout(connectionTimeoutHandle) - if (callback) { - return callback(err) + + const connectedErrorHandler = (err) => { + this._queryable = false + this._errorAllQueries(err) + this.emit('error', err) } - this.emit('error', err) - } - const connectedErrorHandler = (err) => { - this._queryable = false - this._errorAllQueries(err) - this.emit('error', err) - } + const connectedErrorMessageHandler = (msg) => { + const activeQuery = this.activeQuery - const connectedErrorMessageHandler = (msg) => { - const activeQuery = this.activeQuery + if (!activeQuery) { + connectedErrorHandler(msg) + return + } - if (!activeQuery) { - connectedErrorHandler(msg) - return + this.activeQuery = null + activeQuery.handleError(msg, con) } - this.activeQuery = null - activeQuery.handleError(msg, con) - } + con.on('error', connectingErrorHandler) + con.on('errorMessage', connectingErrorHandler) + + // hook up query handling events to connection + // after the connection initially becomes ready for queries + con.once('readyForQuery', function () { + self._connecting = false + self._connected = true + self._attachListeners(con) + con.removeListener('error', connectingErrorHandler) + con.removeListener('errorMessage', connectingErrorHandler) + con.on('error', connectedErrorHandler) + con.on('errorMessage', connectedErrorMessageHandler) + clearTimeout(connectionTimeoutHandle) + + // process possible callback argument to Client#connect + if (callback) { + callback(null, self) + // remove callback for proper error handling + // after the connect event + callback = null + } + self.emit('connect') + }) - con.on('error', connectingErrorHandler) - con.on('errorMessage', connectingErrorHandler) - - // hook up query handling events to connection - // after the connection initially becomes ready for queries - con.once('readyForQuery', function () { - self._connecting = false - self._connected = true - self._attachListeners(con) - con.removeListener('error', connectingErrorHandler) - con.removeListener('errorMessage', connectingErrorHandler) - con.on('error', connectedErrorHandler) - con.on('errorMessage', connectedErrorMessageHandler) - clearTimeout(connectionTimeoutHandle) - - // process possible callback argument to Client#connect - if (callback) { - callback(null, self) - // remove callback for proper error handling - // after the connect event - callback = null - } - self.emit('connect') - }) - - con.on('readyForQuery', function () { - var activeQuery = self.activeQuery - self.activeQuery = null - self.readyForQuery = true - if (activeQuery) { - activeQuery.handleReadyForQuery(con) - } - self._pulseQueryQueue() - }) - - con.once('end', () => { - const error = this._ending ? new Error('Connection terminated') : new Error('Connection terminated unexpectedly') - - clearTimeout(connectionTimeoutHandle) - this._errorAllQueries(error) - - if (!this._ending) { - // if the connection is ended without us calling .end() - // on this client then we have an unexpected disconnection - // treat this as an error unless we've already emitted an error - // during connection. - if (this._connecting && !this._connectionError) { - if (callback) { - callback(error) - } else { + con.on('readyForQuery', function () { + var activeQuery = self.activeQuery + self.activeQuery = null + self.readyForQuery = true + if (activeQuery) { + activeQuery.handleReadyForQuery(con) + } + self._pulseQueryQueue() + }) + + con.once('end', () => { + const error = this._ending ? new Error('Connection terminated') : new Error('Connection terminated unexpectedly') + + clearTimeout(connectionTimeoutHandle) + this._errorAllQueries(error) + + if (!this._ending) { + // if the connection is ended without us calling .end() + // on this client then we have an unexpected disconnection + // treat this as an error unless we've already emitted an error + // during connection. + if (this._connecting && !this._connectionError) { + if (callback) { + callback(error) + } else { + connectedErrorHandler(error) + } + } else if (!this._connectionError) { connectedErrorHandler(error) } - } else if (!this._connectionError) { - connectedErrorHandler(error) } - } - process.nextTick(() => { - this.emit('end') + process.nextTick(() => { + this.emit('end') + }) }) - }) - con.on('notice', function (msg) { - self.emit('notice', msg) - }) -} + con.on('notice', function (msg) { + self.emit('notice', msg) + }) + } -Client.prototype.connect = function (callback) { - if (callback) { - this._connect(callback) - return + connect(callback) { + if (callback) { + this._connect(callback) + return + } + + return new this._Promise((resolve, reject) => { + this._connect((error) => { + if (error) { + reject(error) + } else { + resolve() + } + }) + }) } - return new this._Promise((resolve, reject) => { - this._connect((error) => { - if (error) { - reject(error) - } else { - resolve() - } + _attachListeners(con) { + const self = this + // delegate rowDescription to active query + con.on('rowDescription', function (msg) { + self.activeQuery.handleRowDescription(msg) }) - }) -} -Client.prototype._attachListeners = function (con) { - const self = this - // delegate rowDescription to active query - con.on('rowDescription', function (msg) { - self.activeQuery.handleRowDescription(msg) - }) - - // delegate dataRow to active query - con.on('dataRow', function (msg) { - self.activeQuery.handleDataRow(msg) - }) - - // delegate portalSuspended to active query - // eslint-disable-next-line no-unused-vars - con.on('portalSuspended', function (msg) { - self.activeQuery.handlePortalSuspended(con) - }) - - // delegate emptyQuery to active query - // eslint-disable-next-line no-unused-vars - con.on('emptyQuery', function (msg) { - self.activeQuery.handleEmptyQuery(con) - }) - - // delegate commandComplete to active query - con.on('commandComplete', function (msg) { - self.activeQuery.handleCommandComplete(msg, con) - }) - - // if a prepared statement has a name and properly parses - // we track that its already been executed so we don't parse - // it again on the same client - // eslint-disable-next-line no-unused-vars - con.on('parseComplete', function (msg) { - if (self.activeQuery.name) { - con.parsedStatements[self.activeQuery.name] = self.activeQuery.text - } - }) + // delegate dataRow to active query + con.on('dataRow', function (msg) { + self.activeQuery.handleDataRow(msg) + }) - // eslint-disable-next-line no-unused-vars - con.on('copyInResponse', function (msg) { - self.activeQuery.handleCopyInResponse(self.connection) - }) + // delegate portalSuspended to active query + // eslint-disable-next-line no-unused-vars + con.on('portalSuspended', function (msg) { + self.activeQuery.handlePortalSuspended(con) + }) - con.on('copyData', function (msg) { - self.activeQuery.handleCopyData(msg, self.connection) - }) + // delegate emptyQuery to active query + // eslint-disable-next-line no-unused-vars + con.on('emptyQuery', function (msg) { + self.activeQuery.handleEmptyQuery(con) + }) - con.on('notification', function (msg) { - self.emit('notification', msg) - }) -} + // delegate commandComplete to active query + con.on('commandComplete', function (msg) { + self.activeQuery.handleCommandComplete(msg, con) + }) -Client.prototype.getStartupConf = function () { - var params = this.connectionParameters + // if a prepared statement has a name and properly parses + // we track that its already been executed so we don't parse + // it again on the same client + // eslint-disable-next-line no-unused-vars + con.on('parseComplete', function (msg) { + if (self.activeQuery.name) { + con.parsedStatements[self.activeQuery.name] = self.activeQuery.text + } + }) - var data = { - user: params.user, - database: params.database, + con.on('copyInResponse', this.handleCopyInResponse.bind(this)) + con.on('copyData', this.handleCopyData.bind(this)) + con.on('notification', this.handleNotification.bind(this)) } - var appName = params.application_name || params.fallback_application_name - if (appName) { - data.application_name = appName + handleCopyInResponse(msg) { + this.activeQuery.handleCopyInResponse(this.connection) } - if (params.replication) { - data.replication = '' + params.replication - } - if (params.statement_timeout) { - data.statement_timeout = String(parseInt(params.statement_timeout, 10)) - } - if (params.idle_in_transaction_session_timeout) { - data.idle_in_transaction_session_timeout = String(parseInt(params.idle_in_transaction_session_timeout, 10)) + + handleCopyData(msg) { + this.activeQuery.handleCopyData(msg, this.connection) } - if (params.options) { - data.options = params.options + + handleNotification(msg) { + this.emit('notification', msg) } - return data -} + getStartupConf() { + var params = this.connectionParameters -Client.prototype.cancel = function (client, query) { - if (client.activeQuery === query) { - var con = this.connection + var data = { + user: params.user, + database: params.database, + } - if (this.host && this.host.indexOf('/') === 0) { - con.connect(this.host + '/.s.PGSQL.' + this.port) - } else { - con.connect(this.port, this.host) + var appName = params.application_name || params.fallback_application_name + if (appName) { + data.application_name = appName + } + if (params.replication) { + data.replication = '' + params.replication + } + if (params.statement_timeout) { + data.statement_timeout = String(parseInt(params.statement_timeout, 10)) + } + if (params.idle_in_transaction_session_timeout) { + data.idle_in_transaction_session_timeout = String(parseInt(params.idle_in_transaction_session_timeout, 10)) + } + if (params.options) { + data.options = params.options } - // once connection is established send cancel message - con.on('connect', function () { - con.cancel(client.processID, client.secretKey) - }) - } else if (client.queryQueue.indexOf(query) !== -1) { - client.queryQueue.splice(client.queryQueue.indexOf(query), 1) + return data } -} - -Client.prototype.setTypeParser = function (oid, format, parseFn) { - return this._types.setTypeParser(oid, format, parseFn) -} -Client.prototype.getTypeParser = function (oid, format) { - return this._types.getTypeParser(oid, format) -} + cancel(client, query) { + if (client.activeQuery === query) { + var con = this.connection -// Ported from PostgreSQL 9.2.4 source code in src/interfaces/libpq/fe-exec.c -Client.prototype.escapeIdentifier = function (str) { - return '"' + str.replace(/"/g, '""') + '"' -} + if (this.host && this.host.indexOf('/') === 0) { + con.connect(this.host + '/.s.PGSQL.' + this.port) + } else { + con.connect(this.port, this.host) + } -// Ported from PostgreSQL 9.2.4 source code in src/interfaces/libpq/fe-exec.c -Client.prototype.escapeLiteral = function (str) { - var hasBackslash = false - var escaped = "'" - - for (var i = 0; i < str.length; i++) { - var c = str[i] - if (c === "'") { - escaped += c + c - } else if (c === '\\') { - escaped += c + c - hasBackslash = true - } else { - escaped += c + // once connection is established send cancel message + con.on('connect', function () { + con.cancel(client.processID, client.secretKey) + }) + } else if (client.queryQueue.indexOf(query) !== -1) { + client.queryQueue.splice(client.queryQueue.indexOf(query), 1) } } - escaped += "'" - - if (hasBackslash === true) { - escaped = ' E' + escaped + setTypeParser(oid, format, parseFn) { + return this._types.setTypeParser(oid, format, parseFn) } - return escaped -} + getTypeParser(oid, format) { + return this._types.getTypeParser(oid, format) + } -Client.prototype._pulseQueryQueue = function () { - if (this.readyForQuery === true) { - this.activeQuery = this.queryQueue.shift() - if (this.activeQuery) { - this.readyForQuery = false - this.hasExecuted = true + // Ported from PostgreSQL 9.2.4 source code in src/interfaces/libpq/fe-exec.c + escapeIdentifier(str) { + return '"' + str.replace(/"/g, '""') + '"' + } - const queryError = this.activeQuery.submit(this.connection) - if (queryError) { - process.nextTick(() => { - this.activeQuery.handleError(queryError, this.connection) - this.readyForQuery = true - this._pulseQueryQueue() - }) + // Ported from PostgreSQL 9.2.4 source code in src/interfaces/libpq/fe-exec.c + escapeLiteral(str) { + var hasBackslash = false + var escaped = "'" + + for (var i = 0; i < str.length; i++) { + var c = str[i] + if (c === "'") { + escaped += c + c + } else if (c === '\\') { + escaped += c + c + hasBackslash = true + } else { + escaped += c } - } else if (this.hasExecuted) { - this.activeQuery = null - this.emit('drain') } - } -} -Client.prototype.query = function (config, values, callback) { - // can take in strings, config object or query object - var query - var result - var readTimeout - var readTimeoutTimer - var queryCallback - - if (config === null || config === undefined) { - throw new TypeError('Client was passed a null or undefined query') - } else if (typeof config.submit === 'function') { - readTimeout = config.query_timeout || this.connectionParameters.query_timeout - result = query = config - if (typeof values === 'function') { - query.callback = query.callback || values + escaped += "'" + + if (hasBackslash === true) { + escaped = ' E' + escaped } - } else { - readTimeout = this.connectionParameters.query_timeout - query = new Query(config, values, callback) - if (!query.callback) { - result = new this._Promise((resolve, reject) => { - query.callback = (err, res) => (err ? reject(err) : resolve(res)) - }) + + return escaped + } + + _pulseQueryQueue() { + if (this.readyForQuery === true) { + this.activeQuery = this.queryQueue.shift() + if (this.activeQuery) { + this.readyForQuery = false + this.hasExecuted = true + + const queryError = this.activeQuery.submit(this.connection) + if (queryError) { + process.nextTick(() => { + this.activeQuery.handleError(queryError, this.connection) + this.readyForQuery = true + this._pulseQueryQueue() + }) + } + } else if (this.hasExecuted) { + this.activeQuery = null + this.emit('drain') + } } } - if (readTimeout) { - queryCallback = query.callback + query(config, values, callback) { + // can take in strings, config object or query object + var query + var result + var readTimeout + var readTimeoutTimer + var queryCallback + + if (config === null || config === undefined) { + throw new TypeError('Client was passed a null or undefined query') + } else if (typeof config.submit === 'function') { + readTimeout = config.query_timeout || this.connectionParameters.query_timeout + result = query = config + if (typeof values === 'function') { + query.callback = query.callback || values + } + } else { + readTimeout = this.connectionParameters.query_timeout + query = new Query(config, values, callback) + if (!query.callback) { + result = new this._Promise((resolve, reject) => { + query.callback = (err, res) => (err ? reject(err) : resolve(res)) + }) + } + } - readTimeoutTimer = setTimeout(() => { - var error = new Error('Query read timeout') + if (readTimeout) { + queryCallback = query.callback - process.nextTick(() => { - query.handleError(error, this.connection) - }) + readTimeoutTimer = setTimeout(() => { + var error = new Error('Query read timeout') - queryCallback(error) + process.nextTick(() => { + query.handleError(error, this.connection) + }) + + queryCallback(error) + + // we already returned an error, + // just do nothing if query completes + query.callback = () => {} + + // Remove from queue + var index = this.queryQueue.indexOf(query) + if (index > -1) { + this.queryQueue.splice(index, 1) + } - // we already returned an error, - // just do nothing if query completes - query.callback = () => {} + this._pulseQueryQueue() + }, readTimeout) - // Remove from queue - var index = this.queryQueue.indexOf(query) - if (index > -1) { - this.queryQueue.splice(index, 1) + query.callback = (err, res) => { + clearTimeout(readTimeoutTimer) + queryCallback(err, res) } + } - this._pulseQueryQueue() - }, readTimeout) + if (this.binary && !query.binary) { + query.binary = true + } - query.callback = (err, res) => { - clearTimeout(readTimeoutTimer) - queryCallback(err, res) + if (query._result && !query._result._types) { + query._result._types = this._types } - } - if (this.binary && !query.binary) { - query.binary = true - } + if (!this._queryable) { + process.nextTick(() => { + query.handleError(new Error('Client has encountered a connection error and is not queryable'), this.connection) + }) + return result + } - if (query._result && !query._result._types) { - query._result._types = this._types - } + if (this._ending) { + process.nextTick(() => { + query.handleError(new Error('Client was closed and is not queryable'), this.connection) + }) + return result + } - if (!this._queryable) { - process.nextTick(() => { - query.handleError(new Error('Client has encountered a connection error and is not queryable'), this.connection) - }) + this.queryQueue.push(query) + this._pulseQueryQueue() return result } - if (this._ending) { - process.nextTick(() => { - query.handleError(new Error('Client was closed and is not queryable'), this.connection) - }) - return result - } + end(cb) { + this._ending = true - this.queryQueue.push(query) - this._pulseQueryQueue() - return result -} + // if we have never connected, then end is a noop, callback immediately + if (!this.connection._connecting) { + if (cb) { + cb() + } else { + return this._Promise.resolve() + } + } -Client.prototype.end = function (cb) { - this._ending = true + if (this.activeQuery || !this._queryable) { + // if we have an active query we need to force a disconnect + // on the socket - otherwise a hung query could block end forever + this.connection.stream.destroy() + } else { + this.connection.end() + } - // if we have never connected, then end is a noop, callback immediately - if (!this.connection._connecting) { if (cb) { - cb() + this.connection.once('end', cb) } else { - return this._Promise.resolve() + return new this._Promise((resolve) => { + this.connection.once('end', resolve) + }) } } - - if (this.activeQuery || !this._queryable) { - // if we have an active query we need to force a disconnect - // on the socket - otherwise a hung query could block end forever - this.connection.stream.destroy() - } else { - this.connection.end() - } - - if (cb) { - this.connection.once('end', cb) - } else { - return new this._Promise((resolve) => { - this.connection.once('end', resolve) - }) - } } // expose a Query constructor diff --git a/packages/pg/lib/connection-parameters.js b/packages/pg/lib/connection-parameters.js index 546682521..eae798d50 100644 --- a/packages/pg/lib/connection-parameters.js +++ b/packages/pg/lib/connection-parameters.js @@ -40,73 +40,6 @@ var readSSLConfigFromEnvironment = function () { return defaults.ssl } -var ConnectionParameters = function (config) { - // if a string is passed, it is a raw connection string so we parse it into a config - config = typeof config === 'string' ? parse(config) : config || {} - - // if the config has a connectionString defined, parse IT into the config we use - // this will override other default values with what is stored in connectionString - if (config.connectionString) { - config = Object.assign({}, config, parse(config.connectionString)) - } - - this.user = val('user', config) - this.database = val('database', config) - - if (this.database === undefined) { - this.database = this.user - } - - this.port = parseInt(val('port', config), 10) - this.host = val('host', config) - - // "hiding" the password so it doesn't show up in stack traces - // or if the client is console.logged - Object.defineProperty(this, 'password', { - configurable: true, - enumerable: false, - writable: true, - value: val('password', config), - }) - - this.binary = val('binary', config) - this.options = val('options', config) - - this.ssl = typeof config.ssl === 'undefined' ? readSSLConfigFromEnvironment() : config.ssl - - // support passing in ssl=no-verify via connection string - if (this.ssl === 'no-verify') { - this.ssl = { rejectUnauthorized: false } - } - - this.client_encoding = val('client_encoding', config) - this.replication = val('replication', config) - // a domain socket begins with '/' - this.isDomainSocket = !(this.host || '').indexOf('/') - - this.application_name = val('application_name', config, 'PGAPPNAME') - this.fallback_application_name = val('fallback_application_name', config, false) - this.statement_timeout = val('statement_timeout', config, false) - this.idle_in_transaction_session_timeout = val('idle_in_transaction_session_timeout', config, false) - this.query_timeout = val('query_timeout', config, false) - - if (config.connectionTimeoutMillis === undefined) { - this.connect_timeout = process.env.PGCONNECT_TIMEOUT || 0 - } else { - this.connect_timeout = Math.floor(config.connectionTimeoutMillis / 1000) - } - - if (config.keepAlive === false) { - this.keepalives = 0 - } else if (config.keepAlive === true) { - this.keepalives = 1 - } - - if (typeof config.keepAliveInitialDelayMillis === 'number') { - this.keepalives_idle = Math.floor(config.keepAliveInitialDelayMillis / 1000) - } -} - // Convert arg to a string, surround in single quotes, and escape single quotes and backslashes var quoteParamValue = function (value) { return "'" + ('' + value).replace(/\\/g, '\\\\').replace(/'/g, "\\'") + "'" @@ -119,43 +52,112 @@ var add = function (params, config, paramName) { } } -ConnectionParameters.prototype.getLibpqConnectionString = function (cb) { - var params = [] - add(params, this, 'user') - add(params, this, 'password') - add(params, this, 'port') - add(params, this, 'application_name') - add(params, this, 'fallback_application_name') - add(params, this, 'connect_timeout') - add(params, this, 'options') - - var ssl = typeof this.ssl === 'object' ? this.ssl : this.ssl ? { sslmode: this.ssl } : {} - add(params, ssl, 'sslmode') - add(params, ssl, 'sslca') - add(params, ssl, 'sslkey') - add(params, ssl, 'sslcert') - add(params, ssl, 'sslrootcert') - - if (this.database) { - params.push('dbname=' + quoteParamValue(this.database)) - } - if (this.replication) { - params.push('replication=' + quoteParamValue(this.replication)) +class ConnectionParameters { + constructor(config) { + // if a string is passed, it is a raw connection string so we parse it into a config + config = typeof config === 'string' ? parse(config) : config || {} + + // if the config has a connectionString defined, parse IT into the config we use + // this will override other default values with what is stored in connectionString + if (config.connectionString) { + config = Object.assign({}, config, parse(config.connectionString)) + } + + this.user = val('user', config) + this.database = val('database', config) + + if (this.database === undefined) { + this.database = this.user + } + + this.port = parseInt(val('port', config), 10) + this.host = val('host', config) + + // "hiding" the password so it doesn't show up in stack traces + // or if the client is console.logged + Object.defineProperty(this, 'password', { + configurable: true, + enumerable: false, + writable: true, + value: val('password', config), + }) + + this.binary = val('binary', config) + this.options = val('options', config) + + this.ssl = typeof config.ssl === 'undefined' ? readSSLConfigFromEnvironment() : config.ssl + + // support passing in ssl=no-verify via connection string + if (this.ssl === 'no-verify') { + this.ssl = { rejectUnauthorized: false } + } + + this.client_encoding = val('client_encoding', config) + this.replication = val('replication', config) + // a domain socket begins with '/' + this.isDomainSocket = !(this.host || '').indexOf('/') + + this.application_name = val('application_name', config, 'PGAPPNAME') + this.fallback_application_name = val('fallback_application_name', config, false) + this.statement_timeout = val('statement_timeout', config, false) + this.idle_in_transaction_session_timeout = val('idle_in_transaction_session_timeout', config, false) + this.query_timeout = val('query_timeout', config, false) + + if (config.connectionTimeoutMillis === undefined) { + this.connect_timeout = process.env.PGCONNECT_TIMEOUT || 0 + } else { + this.connect_timeout = Math.floor(config.connectionTimeoutMillis / 1000) + } + + if (config.keepAlive === false) { + this.keepalives = 0 + } else if (config.keepAlive === true) { + this.keepalives = 1 + } + + if (typeof config.keepAliveInitialDelayMillis === 'number') { + this.keepalives_idle = Math.floor(config.keepAliveInitialDelayMillis / 1000) + } } - if (this.host) { - params.push('host=' + quoteParamValue(this.host)) - } - if (this.isDomainSocket) { - return cb(null, params.join(' ')) - } - if (this.client_encoding) { - params.push('client_encoding=' + quoteParamValue(this.client_encoding)) + + getLibpqConnectionString(cb) { + var params = [] + add(params, this, 'user') + add(params, this, 'password') + add(params, this, 'port') + add(params, this, 'application_name') + add(params, this, 'fallback_application_name') + add(params, this, 'connect_timeout') + add(params, this, 'options') + + var ssl = typeof this.ssl === 'object' ? this.ssl : this.ssl ? { sslmode: this.ssl } : {} + add(params, ssl, 'sslmode') + add(params, ssl, 'sslca') + add(params, ssl, 'sslkey') + add(params, ssl, 'sslcert') + add(params, ssl, 'sslrootcert') + + if (this.database) { + params.push('dbname=' + quoteParamValue(this.database)) + } + if (this.replication) { + params.push('replication=' + quoteParamValue(this.replication)) + } + if (this.host) { + params.push('host=' + quoteParamValue(this.host)) + } + if (this.isDomainSocket) { + return cb(null, params.join(' ')) + } + if (this.client_encoding) { + params.push('client_encoding=' + quoteParamValue(this.client_encoding)) + } + dns.lookup(this.host, function (err, address) { + if (err) return cb(err, null) + params.push('hostaddr=' + quoteParamValue(address)) + return cb(null, params.join(' ')) + }) } - dns.lookup(this.host, function (err, address) { - if (err) return cb(err, null) - params.push('hostaddr=' + quoteParamValue(address)) - return cb(null, params.join(' ')) - }) } module.exports = ConnectionParameters diff --git a/packages/pg/lib/result.js b/packages/pg/lib/result.js index 233455b06..5e895736b 100644 --- a/packages/pg/lib/result.js +++ b/packages/pg/lib/result.js @@ -9,95 +9,170 @@ var types = require('pg-types') +var matchRegexp = /^([A-Za-z]+)(?: (\d+))?(?: (\d+))?/ + // result object returned from query // in the 'end' event and also // passed as second argument to provided callback -var Result = function (rowMode, types) { - this.command = null - this.rowCount = null - this.oid = null - this.rows = [] - this.fields = [] - this._parsers = undefined - this._types = types - this.RowCtor = null - this.rowAsArray = rowMode === 'array' - if (this.rowAsArray) { - this.parseRow = this._parseRowAsArray +class Result { + constructor(rowMode, types) { + this.command = null + this.rowCount = null + this.oid = null + this.rows = [] + this.fields = [] + this._parsers = undefined + this._types = types + this.RowCtor = null + this.rowAsArray = rowMode === 'array' + if (this.rowAsArray) { + this.parseRow = this._parseRowAsArray + } } -} -var matchRegexp = /^([A-Za-z]+)(?: (\d+))?(?: (\d+))?/ + // adds a command complete message + addCommandComplete(msg) { + var match + if (msg.text) { + // pure javascript + match = matchRegexp.exec(msg.text) + } else { + // native bindings + match = matchRegexp.exec(msg.command) + } + if (match) { + this.command = match[1] + if (match[3]) { + // COMMMAND OID ROWS + this.oid = parseInt(match[2], 10) + this.rowCount = parseInt(match[3], 10) + } else if (match[2]) { + // COMMAND ROWS + this.rowCount = parseInt(match[2], 10) + } + } + } -// adds a command complete message -Result.prototype.addCommandComplete = function (msg) { - var match - if (msg.text) { - // pure javascript - match = matchRegexp.exec(msg.text) - } else { - // native bindings - match = matchRegexp.exec(msg.command) + _parseRowAsArray(rowData) { + var row = new Array(rowData.length) + for (var i = 0, len = rowData.length; i < len; i++) { + var rawValue = rowData[i] + if (rawValue !== null) { + row[i] = this._parsers[i](rawValue) + } else { + row[i] = null + } + } + return row } - if (match) { - this.command = match[1] - if (match[3]) { - // COMMMAND OID ROWS - this.oid = parseInt(match[2], 10) - this.rowCount = parseInt(match[3], 10) - } else if (match[2]) { - // COMMAND ROWS - this.rowCount = parseInt(match[2], 10) + + parseRow(rowData) { + var row = {} + for (var i = 0, len = rowData.length; i < len; i++) { + var rawValue = rowData[i] + var field = this.fields[i].name + if (rawValue !== null) { + row[field] = this._parsers[i](rawValue) + } else { + row[field] = null + } } + return row } -} -Result.prototype._parseRowAsArray = function (rowData) { - var row = new Array(rowData.length) - for (var i = 0, len = rowData.length; i < len; i++) { - var rawValue = rowData[i] - if (rawValue !== null) { - row[i] = this._parsers[i](rawValue) - } else { - row[i] = null + addRow(row) { + this.rows.push(row) + } + + addFields(fieldDescriptions) { + // clears field definitions + // multiple query statements in 1 action can result in multiple sets + // of rowDescriptions...eg: 'select NOW(); select 1::int;' + // you need to reset the fields + this.fields = fieldDescriptions + if (this.fields.length) { + this._parsers = new Array(fieldDescriptions.length) + } + for (var i = 0; i < fieldDescriptions.length; i++) { + var desc = fieldDescriptions[i] + if (this._types) { + this._parsers[i] = this._types.getTypeParser(desc.dataTypeID, desc.format || 'text') + } else { + this._parsers[i] = types.getTypeParser(desc.dataTypeID, desc.format || 'text') + } } } - return row -} -Result.prototype.parseRow = function (rowData) { - var row = {} - for (var i = 0, len = rowData.length; i < len; i++) { - var rawValue = rowData[i] - var field = this.fields[i].name - if (rawValue !== null) { - row[field] = this._parsers[i](rawValue) + // adds a command complete message + addCommandComplete(msg) { + var match + if (msg.text) { + // pure javascript + match = matchRegexp.exec(msg.text) } else { - row[field] = null + // native bindings + match = matchRegexp.exec(msg.command) + } + if (match) { + this.command = match[1] + if (match[3]) { + // COMMMAND OID ROWS + this.oid = parseInt(match[2], 10) + this.rowCount = parseInt(match[3], 10) + } else if (match[2]) { + // COMMAND ROWS + this.rowCount = parseInt(match[2], 10) + } } } - return row -} -Result.prototype.addRow = function (row) { - this.rows.push(row) -} + _parseRowAsArray(rowData) { + var row = new Array(rowData.length) + for (var i = 0, len = rowData.length; i < len; i++) { + var rawValue = rowData[i] + if (rawValue !== null) { + row[i] = this._parsers[i](rawValue) + } else { + row[i] = null + } + } + return row + } -Result.prototype.addFields = function (fieldDescriptions) { - // clears field definitions - // multiple query statements in 1 action can result in multiple sets - // of rowDescriptions...eg: 'select NOW(); select 1::int;' - // you need to reset the fields - this.fields = fieldDescriptions - if (this.fields.length) { - this._parsers = new Array(fieldDescriptions.length) + parseRow(rowData) { + var row = {} + for (var i = 0, len = rowData.length; i < len; i++) { + var rawValue = rowData[i] + var field = this.fields[i].name + if (rawValue !== null) { + row[field] = this._parsers[i](rawValue) + } else { + row[field] = null + } + } + return row } - for (var i = 0; i < fieldDescriptions.length; i++) { - var desc = fieldDescriptions[i] - if (this._types) { - this._parsers[i] = this._types.getTypeParser(desc.dataTypeID, desc.format || 'text') - } else { - this._parsers[i] = types.getTypeParser(desc.dataTypeID, desc.format || 'text') + + addRow(row) { + this.rows.push(row) + } + + addFields(fieldDescriptions) { + // clears field definitions + // multiple query statements in 1 action can result in multiple sets + // of rowDescriptions...eg: 'select NOW(); select 1::int;' + // you need to reset the fields + this.fields = fieldDescriptions + if (this.fields.length) { + this._parsers = new Array(fieldDescriptions.length) + } + for (var i = 0; i < fieldDescriptions.length; i++) { + var desc = fieldDescriptions[i] + if (this._types) { + this._parsers[i] = this._types.getTypeParser(desc.dataTypeID, desc.format || 'text') + } else { + this._parsers[i] = types.getTypeParser(desc.dataTypeID, desc.format || 'text') + } } } } From 66e1e76c9bdc110d9bc42baf71ee6beefd067983 Mon Sep 17 00:00:00 2001 From: "Brian M. Carlson" Date: Wed, 15 Jul 2020 11:05:31 -0500 Subject: [PATCH 02/12] More refactoring --- packages/pg/bench.js | 2 +- packages/pg/lib/client.js | 261 ++++++++++++++++++++------------------ 2 files changed, 136 insertions(+), 127 deletions(-) diff --git a/packages/pg/bench.js b/packages/pg/bench.js index 1c1aa641d..a668aa85f 100644 --- a/packages/pg/bench.js +++ b/packages/pg/bench.js @@ -61,7 +61,7 @@ const run = async () => { queries = await bench(client, insert, seconds * 1000) console.log('insert queries:', queries) console.log('qps', queries / seconds) - console.log('on my laptop best so far seen 5799 qps') + console.log('on my laptop best so far seen 6303 qps') console.log('') console.log('Warming up bytea test') diff --git a/packages/pg/lib/client.js b/packages/pg/lib/client.js index fd9ecad19..2dbebe855 100644 --- a/packages/pg/lib/client.js +++ b/packages/pg/lib/client.js @@ -122,94 +122,25 @@ class Client extends EventEmitter { con.startup(self.getStartupConf()) }) - function checkPgPass(cb) { - return function (msg) { - if (typeof self.password === 'function') { - self._Promise - .resolve() - .then(() => self.password()) - .then((pass) => { - if (pass !== undefined) { - if (typeof pass !== 'string') { - con.emit('error', new TypeError('Password must be a string')) - return - } - self.connectionParameters.password = self.password = pass - } else { - self.connectionParameters.password = self.password = null - } - cb(msg) - }) - .catch((err) => { - con.emit('error', err) - }) - } else if (self.password !== null) { - cb(msg) - } else { - pgPass(self.connectionParameters, function (pass) { - if (undefined !== pass) { - self.connectionParameters.password = self.password = pass - } - cb(msg) - }) - } - } - } - // password request handling - con.on( - 'authenticationCleartextPassword', - checkPgPass(function () { - con.password(self.password) - }) - ) - + con.on('authenticationCleartextPassword', this.handleAuthenticationCleartextPassword.bind(this)) // password request handling - con.on( - 'authenticationMD5Password', - checkPgPass(function (msg) { - con.password(utils.postgresMd5PasswordHash(self.user, self.password, msg.salt)) - }) - ) - - // password request handling (SASL) - var saslSession - con.on( - 'authenticationSASL', - checkPgPass(function (msg) { - saslSession = sasl.startSession(msg.mechanisms) - - con.sendSASLInitialResponseMessage(saslSession.mechanism, saslSession.response) - }) - ) - - // password request handling (SASL) - con.on('authenticationSASLContinue', function (msg) { - sasl.continueSession(saslSession, self.password, msg.data) - - con.sendSCRAMClientFinalMessage(saslSession.response) - }) - + con.on('authenticationMD5Password', this.handleAuthenticationMD5Password.bind(this)) // password request handling (SASL) - con.on('authenticationSASLFinal', function (msg) { - sasl.finalizeSession(saslSession, msg.data) - - saslSession = null - }) - - con.once('backendKeyData', function (msg) { - self.processID = msg.processID - self.secretKey = msg.secretKey - }) + con.on('authenticationSASL', this.handleAuthenticationSASL.bind(this)) + con.on('authenticationSASLContinue', this.handleAuthenticationSASLContinue.bind(this)) + con.on('authenticationSASLFinal', this.handleAuthenticationSASLFinal.bind(this)) + con.once('backendKeyData', this.handleBackendKeyData.bind(this)) + this._connectionCallback = callback const connectingErrorHandler = (err) => { if (this._connectionError) { return } this._connectionError = true clearTimeout(connectionTimeoutHandle) - if (callback) { - return callback(err) + if (this._connectionCallback) { + return this._connectionCallback(err) } this.emit('error', err) } @@ -237,10 +168,9 @@ class Client extends EventEmitter { // hook up query handling events to connection // after the connection initially becomes ready for queries - con.once('readyForQuery', function () { + con.once('readyForQuery', () => { self._connecting = false self._connected = true - self._attachListeners(con) con.removeListener('error', connectingErrorHandler) con.removeListener('errorMessage', connectingErrorHandler) con.on('error', connectedErrorHandler) @@ -248,24 +178,18 @@ class Client extends EventEmitter { clearTimeout(connectionTimeoutHandle) // process possible callback argument to Client#connect - if (callback) { - callback(null, self) + if (this._connectionCallback) { + this._connectionCallback(null, self) // remove callback for proper error handling // after the connect event - callback = null + this._connectionCallback = null } self.emit('connect') }) - con.on('readyForQuery', function () { - var activeQuery = self.activeQuery - self.activeQuery = null - self.readyForQuery = true - if (activeQuery) { - activeQuery.handleReadyForQuery(con) - } - self._pulseQueryQueue() - }) + con.on('readyForQuery', this.handleReadyForQuery.bind(this)) + con.on('notice', this.handleNotice.bind(this)) + self._attachListeners(con) con.once('end', () => { const error = this._ending ? new Error('Connection terminated') : new Error('Connection terminated unexpectedly') @@ -279,8 +203,8 @@ class Client extends EventEmitter { // treat this as an error unless we've already emitted an error // during connection. if (this._connecting && !this._connectionError) { - if (callback) { - callback(error) + if (this._connectionCallback) { + this._connectionCallback(error) } else { connectedErrorHandler(error) } @@ -293,10 +217,6 @@ class Client extends EventEmitter { this.emit('end') }) }) - - con.on('notice', function (msg) { - self.emit('notice', msg) - }) } connect(callback) { @@ -317,47 +237,132 @@ class Client extends EventEmitter { } _attachListeners(con) { - const self = this - // delegate rowDescription to active query - con.on('rowDescription', function (msg) { - self.activeQuery.handleRowDescription(msg) + con.on('rowDescription', this.handleRowDescription.bind(this)) + con.on('dataRow', this.handleDataRow.bind(this)) + con.on('portalSuspended', this.handlePortalSuspended.bind(this)) + con.on('emptyQuery', this.handleEmptyQuery.bind(this)) + con.on('commandComplete', this.handleCommandComplete.bind(this)) + con.on('parseComplete', this.handleParseComplete.bind(this)) + con.on('copyInResponse', this.handleCopyInResponse.bind(this)) + con.on('copyData', this.handleCopyData.bind(this)) + con.on('notification', this.handleNotification.bind(this)) + } + + // TODO(bmc): deprecate pgpass "built in" integration since this.password can be a function + // it can be supplied by the user if required - this is a breaking change! + _checkPgPass(cb) { + return function (msg) { + if (typeof this.password === 'function') { + this._Promise + .resolve() + .then(() => this.password()) + .then((pass) => { + if (pass !== undefined) { + if (typeof pass !== 'string') { + con.emit('error', new TypeError('Password must be a string')) + return + } + this.connectionParameters.password = this.password = pass + } else { + this.connectionParameters.password = this.password = null + } + cb(msg) + }) + .catch((err) => { + con.emit('error', err) + }) + } else if (this.password !== null) { + cb(msg) + } else { + pgPass(this.connectionParameters, function (pass) { + if (undefined !== pass) { + this.connectionParameters.password = this.password = pass + } + cb(msg) + }) + } + } + } + + handleAuthenticationCleartextPassword(msg) { + this._checkPgPass(() => { + this.connection.password(this.password) }) + } - // delegate dataRow to active query - con.on('dataRow', function (msg) { - self.activeQuery.handleDataRow(msg) + handleAuthenticationMD5Password(msg) { + this._checkPgPass((msg) => { + const hashedPassword = utils.postgresMd5PasswordHash(this.user, this.password, msg.salt) + this.connection.password(hashedPassword) }) + } - // delegate portalSuspended to active query - // eslint-disable-next-line no-unused-vars - con.on('portalSuspended', function (msg) { - self.activeQuery.handlePortalSuspended(con) + handleAuthenticationSASL(msg) { + this._checkPgPass((msg) => { + this.saslSession = sasl.startSession(msg.mechanisms) + const con = this.connection + con.sendSASLInitialResponseMessage(saslSession.mechanism, saslSession.response) }) + } + handleAuthenticationSASLContinue(msg) { + const { saslSession } = this + sasl.continueSession(saslSession, self.password, msg.data) + con.sendSCRAMClientFinalMessage(saslSession.response) + } + + handleAuthenticationSASLFinal(msg) { + sasl.finalizeSession(this.saslSession, msg.data) + this.saslSession = null + } + + handleBackendKeyData(msg) { + this.processID = msg.processID + this.secretKey = msg.secretKey + } + + handleReadyForQuery(msg) { + const { activeQuery } = this + this.activeQuery = null + this.readyForQuery = true + if (activeQuery) { + activeQuery.handleReadyForQuery(this.connection) + } + this._pulseQueryQueue() + } + + handleRowDescription(msg) { + // delegate rowDescription to active query + this.activeQuery.handleRowDescription(msg) + } + + handleDataRow(msg) { + // delegate dataRow to active query + this.activeQuery.handleDataRow(msg) + } + + handlePortalSuspended(msg) { + // delegate portalSuspended to active query + this.activeQuery.handlePortalSuspended(this.connection) + } + + handleEmptyQuery(msg) { // delegate emptyQuery to active query - // eslint-disable-next-line no-unused-vars - con.on('emptyQuery', function (msg) { - self.activeQuery.handleEmptyQuery(con) - }) + this.activeQuery.handleEmptyQuery(this.connection) + } + handleCommandComplete(msg) { // delegate commandComplete to active query - con.on('commandComplete', function (msg) { - self.activeQuery.handleCommandComplete(msg, con) - }) + this.activeQuery.handleCommandComplete(msg, this.connection) + } + handleParseComplete(msg) { // if a prepared statement has a name and properly parses // we track that its already been executed so we don't parse // it again on the same client - // eslint-disable-next-line no-unused-vars - con.on('parseComplete', function (msg) { - if (self.activeQuery.name) { - con.parsedStatements[self.activeQuery.name] = self.activeQuery.text - } - }) - - con.on('copyInResponse', this.handleCopyInResponse.bind(this)) - con.on('copyData', this.handleCopyData.bind(this)) - con.on('notification', this.handleNotification.bind(this)) + if (this.activeQuery.name) { + this.connection.parsedStatements[this.activeQuery.name] = this.activeQuery.text + } } handleCopyInResponse(msg) { @@ -372,6 +377,10 @@ class Client extends EventEmitter { this.emit('notification', msg) } + handleNotice(msg) { + this.emit('notice', msg) + } + getStartupConf() { var params = this.connectionParameters From 0b424cfff18e338e3860496ad957a178fed1892f Mon Sep 17 00:00:00 2001 From: "Brian M. Carlson" Date: Wed, 15 Jul 2020 11:16:46 -0500 Subject: [PATCH 03/12] Move more functionality to methods --- packages/pg/lib/client.js | 74 +++++++++++++++++++++++---------------- 1 file changed, 43 insertions(+), 31 deletions(-) diff --git a/packages/pg/lib/client.js b/packages/pg/lib/client.js index 2dbebe855..926fa6bba 100644 --- a/packages/pg/lib/client.js +++ b/packages/pg/lib/client.js @@ -95,9 +95,9 @@ class Client extends EventEmitter { } this._connecting = true - var connectionTimeoutHandle + this.connectionTimeoutHandle if (this._connectionTimeoutMillis > 0) { - connectionTimeoutHandle = setTimeout(() => { + this.connectionTimeoutHandle = setTimeout(() => { con._ending = true con.stream.destroy(new Error('timeout expired')) }, this._connectionTimeoutMillis) @@ -133,35 +133,11 @@ class Client extends EventEmitter { con.once('backendKeyData', this.handleBackendKeyData.bind(this)) this._connectionCallback = callback - const connectingErrorHandler = (err) => { - if (this._connectionError) { - return - } - this._connectionError = true - clearTimeout(connectionTimeoutHandle) - if (this._connectionCallback) { - return this._connectionCallback(err) - } - this.emit('error', err) - } - - const connectedErrorHandler = (err) => { - this._queryable = false - this._errorAllQueries(err) - this.emit('error', err) - } + const connectingErrorHandler = this.handleErrorWhileConnecting.bind(this) - const connectedErrorMessageHandler = (msg) => { - const activeQuery = this.activeQuery + const connectedErrorHandler = this.handleErrorWhileConnected.bind(this) - if (!activeQuery) { - connectedErrorHandler(msg) - return - } - - this.activeQuery = null - activeQuery.handleError(msg, con) - } + const connectedErrorMessageHandler = this.handleErrorMessage.bind(this) con.on('error', connectingErrorHandler) con.on('errorMessage', connectingErrorHandler) @@ -175,7 +151,7 @@ class Client extends EventEmitter { con.removeListener('errorMessage', connectingErrorHandler) con.on('error', connectedErrorHandler) con.on('errorMessage', connectedErrorMessageHandler) - clearTimeout(connectionTimeoutHandle) + clearTimeout(this.connectionTimeoutHandle) // process possible callback argument to Client#connect if (this._connectionCallback) { @@ -194,7 +170,7 @@ class Client extends EventEmitter { con.once('end', () => { const error = this._ending ? new Error('Connection terminated') : new Error('Connection terminated unexpectedly') - clearTimeout(connectionTimeoutHandle) + clearTimeout(this.connectionTimeoutHandle) this._errorAllQueries(error) if (!this._ending) { @@ -331,6 +307,42 @@ class Client extends EventEmitter { this._pulseQueryQueue() } + // if we receieve an error during the connection process we handle it here + handleErrorWhileConnecting(err) { + if (this._connectionError) { + // TODO(bmc): this is swallowing errors - we shouldn't do this + return + } + this._connectionError = true + clearTimeout(this.connectionTimeoutHandle) + if (this._connectionCallback) { + return this._connectionCallback(err) + } + this.emit('error', err) + } + + // if we're connected and we receive an error event from the connection + // this means the socket is dead - do a hard abort of all queries and emit + // the socket error on the client as well + handleErrorWhileConnected(err) { + this._queryable = false + this._errorAllQueries(err) + this.emit('error', err) + } + + // handle error messages from the postgres backend + handleErrorMessage(msg) { + const activeQuery = this.activeQuery + + if (!activeQuery) { + this.handleErrorWhileConnected(msg) + return + } + + this.activeQuery = null + activeQuery.handleError(msg, this.connection) + } + handleRowDescription(msg) { // delegate rowDescription to active query this.activeQuery.handleRowDescription(msg) From 63e15d15fab69fc769995ce6bf45a82175923919 Mon Sep 17 00:00:00 2001 From: "Brian M. Carlson" Date: Wed, 15 Jul 2020 11:31:16 -0500 Subject: [PATCH 04/12] Refactor --- packages/pg/lib/client.js | 94 ++++++++++++++++++--------------------- 1 file changed, 43 insertions(+), 51 deletions(-) diff --git a/packages/pg/lib/client.js b/packages/pg/lib/client.js index 926fa6bba..7f1356e98 100644 --- a/packages/pg/lib/client.js +++ b/packages/pg/lib/client.js @@ -86,6 +86,8 @@ class Client extends EventEmitter { _connect(callback) { var self = this var con = this.connection + this._connectionCallback = callback + if (this._connecting || this._connected) { const err = new Error('Client has already been connected. You cannot reuse a client.') process.nextTick(() => { @@ -122,50 +124,7 @@ class Client extends EventEmitter { con.startup(self.getStartupConf()) }) - // password request handling - con.on('authenticationCleartextPassword', this.handleAuthenticationCleartextPassword.bind(this)) - // password request handling - con.on('authenticationMD5Password', this.handleAuthenticationMD5Password.bind(this)) - // password request handling (SASL) - con.on('authenticationSASL', this.handleAuthenticationSASL.bind(this)) - con.on('authenticationSASLContinue', this.handleAuthenticationSASLContinue.bind(this)) - con.on('authenticationSASLFinal', this.handleAuthenticationSASLFinal.bind(this)) - con.once('backendKeyData', this.handleBackendKeyData.bind(this)) - - this._connectionCallback = callback - const connectingErrorHandler = this.handleErrorWhileConnecting.bind(this) - - const connectedErrorHandler = this.handleErrorWhileConnected.bind(this) - - const connectedErrorMessageHandler = this.handleErrorMessage.bind(this) - - con.on('error', connectingErrorHandler) - con.on('errorMessage', connectingErrorHandler) - - // hook up query handling events to connection - // after the connection initially becomes ready for queries - con.once('readyForQuery', () => { - self._connecting = false - self._connected = true - con.removeListener('error', connectingErrorHandler) - con.removeListener('errorMessage', connectingErrorHandler) - con.on('error', connectedErrorHandler) - con.on('errorMessage', connectedErrorMessageHandler) - clearTimeout(this.connectionTimeoutHandle) - - // process possible callback argument to Client#connect - if (this._connectionCallback) { - this._connectionCallback(null, self) - // remove callback for proper error handling - // after the connect event - this._connectionCallback = null - } - self.emit('connect') - }) - - con.on('readyForQuery', this.handleReadyForQuery.bind(this)) - con.on('notice', this.handleNotice.bind(this)) - self._attachListeners(con) + this._attachListeners(con) con.once('end', () => { const error = this._ending ? new Error('Connection terminated') : new Error('Connection terminated unexpectedly') @@ -182,10 +141,10 @@ class Client extends EventEmitter { if (this._connectionCallback) { this._connectionCallback(error) } else { - connectedErrorHandler(error) + this.handleErrorWhileConnected(error) } } else if (!this._connectionError) { - connectedErrorHandler(error) + this.handleErrorWhileConnected(error) } } @@ -213,6 +172,19 @@ class Client extends EventEmitter { } _attachListeners(con) { + // password request handling + con.on('authenticationCleartextPassword', this.handleAuthenticationCleartextPassword.bind(this)) + // password request handling + con.on('authenticationMD5Password', this.handleAuthenticationMD5Password.bind(this)) + // password request handling (SASL) + con.on('authenticationSASL', this.handleAuthenticationSASL.bind(this)) + con.on('authenticationSASLContinue', this.handleAuthenticationSASLContinue.bind(this)) + con.on('authenticationSASLFinal', this.handleAuthenticationSASLFinal.bind(this)) + con.on('backendKeyData', this.handleBackendKeyData.bind(this)) + con.on('error', this.handleErrorWhileConnecting) + con.on('errorMessage', this.handleErrorMessage) + con.on('readyForQuery', this.handleReadyForQuery.bind(this)) + con.on('notice', this.handleNotice.bind(this)) con.on('rowDescription', this.handleRowDescription.bind(this)) con.on('dataRow', this.handleDataRow.bind(this)) con.on('portalSuspended', this.handlePortalSuspended.bind(this)) @@ -283,7 +255,7 @@ class Client extends EventEmitter { handleAuthenticationSASLContinue(msg) { const { saslSession } = this - sasl.continueSession(saslSession, self.password, msg.data) + sasl.continueSession(saslSession, this.password, msg.data) con.sendSCRAMClientFinalMessage(saslSession.response) } @@ -298,6 +270,23 @@ class Client extends EventEmitter { } handleReadyForQuery(msg) { + if (this._connecting) { + this._connecting = false + this._connected = true + const con = this.connection + con.removeListener('error', this.handleErrorWhileConnecting) + con.on('error', this.handleErrorWhileConnected) + clearTimeout(this.connectionTimeoutHandle) + + // process possible callback argument to Client#connect + if (this._connectionCallback) { + this._connectionCallback(null, this) + // remove callback for proper error handling + // after the connect event + this._connectionCallback = null + } + this.emit('connect') + } const { activeQuery } = this this.activeQuery = null this.readyForQuery = true @@ -307,8 +296,8 @@ class Client extends EventEmitter { this._pulseQueryQueue() } - // if we receieve an error during the connection process we handle it here - handleErrorWhileConnecting(err) { + // if we receieve an error event or error message during the connection process we handle it here + handleErrorWhileConnecting = (err) => { if (this._connectionError) { // TODO(bmc): this is swallowing errors - we shouldn't do this return @@ -324,14 +313,17 @@ class Client extends EventEmitter { // if we're connected and we receive an error event from the connection // this means the socket is dead - do a hard abort of all queries and emit // the socket error on the client as well - handleErrorWhileConnected(err) { + handleErrorWhileConnected = (err) => { this._queryable = false this._errorAllQueries(err) this.emit('error', err) } // handle error messages from the postgres backend - handleErrorMessage(msg) { + handleErrorMessage = (msg) => { + if (this._connecting) { + return this.handleErrorWhileConnecting(msg) + } const activeQuery = this.activeQuery if (!activeQuery) { From 9d1dce9c5ddb654d9ab5bd3a4f4027b9889348d7 Mon Sep 17 00:00:00 2001 From: "Brian M. Carlson" Date: Wed, 15 Jul 2020 11:42:33 -0500 Subject: [PATCH 05/12] Mark handler methods as 'private' --- packages/pg/lib/client.js | 95 ++++++++++++++++++++------------------- 1 file changed, 48 insertions(+), 47 deletions(-) diff --git a/packages/pg/lib/client.js b/packages/pg/lib/client.js index 7f1356e98..1cac61f8b 100644 --- a/packages/pg/lib/client.js +++ b/packages/pg/lib/client.js @@ -141,10 +141,10 @@ class Client extends EventEmitter { if (this._connectionCallback) { this._connectionCallback(error) } else { - this.handleErrorWhileConnected(error) + this._handleErrorEvent(error) } } else if (!this._connectionError) { - this.handleErrorWhileConnected(error) + this._handleErrorEvent(error) } } @@ -173,27 +173,27 @@ class Client extends EventEmitter { _attachListeners(con) { // password request handling - con.on('authenticationCleartextPassword', this.handleAuthenticationCleartextPassword.bind(this)) + con.on('authenticationCleartextPassword', this._handleAuthCleartextPassword.bind(this)) // password request handling - con.on('authenticationMD5Password', this.handleAuthenticationMD5Password.bind(this)) + con.on('authenticationMD5Password', this._handleAuthMD5Password.bind(this)) // password request handling (SASL) - con.on('authenticationSASL', this.handleAuthenticationSASL.bind(this)) - con.on('authenticationSASLContinue', this.handleAuthenticationSASLContinue.bind(this)) - con.on('authenticationSASLFinal', this.handleAuthenticationSASLFinal.bind(this)) - con.on('backendKeyData', this.handleBackendKeyData.bind(this)) - con.on('error', this.handleErrorWhileConnecting) - con.on('errorMessage', this.handleErrorMessage) - con.on('readyForQuery', this.handleReadyForQuery.bind(this)) - con.on('notice', this.handleNotice.bind(this)) - con.on('rowDescription', this.handleRowDescription.bind(this)) - con.on('dataRow', this.handleDataRow.bind(this)) - con.on('portalSuspended', this.handlePortalSuspended.bind(this)) - con.on('emptyQuery', this.handleEmptyQuery.bind(this)) - con.on('commandComplete', this.handleCommandComplete.bind(this)) - con.on('parseComplete', this.handleParseComplete.bind(this)) - con.on('copyInResponse', this.handleCopyInResponse.bind(this)) - con.on('copyData', this.handleCopyData.bind(this)) - con.on('notification', this.handleNotification.bind(this)) + con.on('authenticationSASL', this._handleAuthSASL.bind(this)) + con.on('authenticationSASLContinue', this._handleAuthSASLContinue.bind(this)) + con.on('authenticationSASLFinal', this._handleAuthSASLFinal.bind(this)) + con.on('backendKeyData', this._handleBackendKeyData.bind(this)) + con.on('error', this._handleErrorEvent) + con.on('errorMessage', this._handleErrorMessage) + con.on('readyForQuery', this._handleReadyForQuery.bind(this)) + con.on('notice', this._handleNotice.bind(this)) + con.on('rowDescription', this._handleRowDescription.bind(this)) + con.on('dataRow', this._handleDataRow.bind(this)) + con.on('portalSuspended', this._handlePortalSuspended.bind(this)) + con.on('emptyQuery', this._handleEmptyQuery.bind(this)) + con.on('commandComplete', this._handleCommandComplete.bind(this)) + con.on('parseComplete', this._handleParseComplete.bind(this)) + con.on('copyInResponse', this._handleCopyInResponse.bind(this)) + con.on('copyData', this._handleCopyData.bind(this)) + con.on('notification', this._handleNotification.bind(this)) } // TODO(bmc): deprecate pgpass "built in" integration since this.password can be a function @@ -232,20 +232,20 @@ class Client extends EventEmitter { } } - handleAuthenticationCleartextPassword(msg) { + _handleAuthCleartextPassword(msg) { this._checkPgPass(() => { this.connection.password(this.password) }) } - handleAuthenticationMD5Password(msg) { + _handleAuthMD5Password(msg) { this._checkPgPass((msg) => { const hashedPassword = utils.postgresMd5PasswordHash(this.user, this.password, msg.salt) this.connection.password(hashedPassword) }) } - handleAuthenticationSASL(msg) { + _handleAuthSASL(msg) { this._checkPgPass((msg) => { this.saslSession = sasl.startSession(msg.mechanisms) const con = this.connection @@ -253,29 +253,26 @@ class Client extends EventEmitter { }) } - handleAuthenticationSASLContinue(msg) { + _handleAuthSASLContinue(msg) { const { saslSession } = this sasl.continueSession(saslSession, this.password, msg.data) con.sendSCRAMClientFinalMessage(saslSession.response) } - handleAuthenticationSASLFinal(msg) { + _handleAuthSASLFinal(msg) { sasl.finalizeSession(this.saslSession, msg.data) this.saslSession = null } - handleBackendKeyData(msg) { + _handleBackendKeyData(msg) { this.processID = msg.processID this.secretKey = msg.secretKey } - handleReadyForQuery(msg) { + _handleReadyForQuery(msg) { if (this._connecting) { this._connecting = false this._connected = true - const con = this.connection - con.removeListener('error', this.handleErrorWhileConnecting) - con.on('error', this.handleErrorWhileConnected) clearTimeout(this.connectionTimeoutHandle) // process possible callback argument to Client#connect @@ -296,8 +293,9 @@ class Client extends EventEmitter { this._pulseQueryQueue() } - // if we receieve an error event or error message during the connection process we handle it here - handleErrorWhileConnecting = (err) => { + // if we receieve an error event or error message + // during the connection process we handle it here + _handleErrorWhileConnecting = (err) => { if (this._connectionError) { // TODO(bmc): this is swallowing errors - we shouldn't do this return @@ -313,21 +311,24 @@ class Client extends EventEmitter { // if we're connected and we receive an error event from the connection // this means the socket is dead - do a hard abort of all queries and emit // the socket error on the client as well - handleErrorWhileConnected = (err) => { + _handleErrorEvent = (err) => { + if (this._connecting) { + return this._handleErrorWhileConnecting(err) + } this._queryable = false this._errorAllQueries(err) this.emit('error', err) } // handle error messages from the postgres backend - handleErrorMessage = (msg) => { + _handleErrorMessage = (msg) => { if (this._connecting) { - return this.handleErrorWhileConnecting(msg) + return this._handleErrorWhileConnecting(msg) } const activeQuery = this.activeQuery if (!activeQuery) { - this.handleErrorWhileConnected(msg) + this._handleErrorEvent(msg) return } @@ -335,32 +336,32 @@ class Client extends EventEmitter { activeQuery.handleError(msg, this.connection) } - handleRowDescription(msg) { + _handleRowDescription(msg) { // delegate rowDescription to active query this.activeQuery.handleRowDescription(msg) } - handleDataRow(msg) { + _handleDataRow(msg) { // delegate dataRow to active query this.activeQuery.handleDataRow(msg) } - handlePortalSuspended(msg) { + _handlePortalSuspended(msg) { // delegate portalSuspended to active query this.activeQuery.handlePortalSuspended(this.connection) } - handleEmptyQuery(msg) { + _handleEmptyQuery(msg) { // delegate emptyQuery to active query this.activeQuery.handleEmptyQuery(this.connection) } - handleCommandComplete(msg) { + _handleCommandComplete(msg) { // delegate commandComplete to active query this.activeQuery.handleCommandComplete(msg, this.connection) } - handleParseComplete(msg) { + _handleParseComplete(msg) { // if a prepared statement has a name and properly parses // we track that its already been executed so we don't parse // it again on the same client @@ -369,19 +370,19 @@ class Client extends EventEmitter { } } - handleCopyInResponse(msg) { + _handleCopyInResponse(msg) { this.activeQuery.handleCopyInResponse(this.connection) } - handleCopyData(msg) { + _handleCopyData(msg) { this.activeQuery.handleCopyData(msg, this.connection) } - handleNotification(msg) { + _handleNotification(msg) { this.emit('notification', msg) } - handleNotice(msg) { + _handleNotice(msg) { this.emit('notice', msg) } From 5ba7e3fb48f70ac749aea0d1ffa0cfbd45fec6e2 Mon Sep 17 00:00:00 2001 From: "Brian M. Carlson" Date: Wed, 15 Jul 2020 11:49:54 -0500 Subject: [PATCH 06/12] Refactor connection to class --- packages/pg/lib/connection.js | 332 +++++++++++++++++----------------- 1 file changed, 166 insertions(+), 166 deletions(-) diff --git a/packages/pg/lib/connection.js b/packages/pg/lib/connection.js index 65867026d..0aa3c0969 100644 --- a/packages/pg/lib/connection.js +++ b/packages/pg/lib/connection.js @@ -13,201 +13,201 @@ var util = require('util') const { parse, serialize } = require('pg-protocol') -// TODO(bmc) support binary mode at some point -var Connection = function (config) { - EventEmitter.call(this) - config = config || {} - this.stream = config.stream || new net.Socket() - this._keepAlive = config.keepAlive - this._keepAliveInitialDelayMillis = config.keepAliveInitialDelayMillis - this.lastBuffer = false - this.parsedStatements = {} - this.ssl = config.ssl || false - this._ending = false - this._emitMessage = false - var self = this - this.on('newListener', function (eventName) { - if (eventName === 'message') { - self._emitMessage = true - } - }) -} - -util.inherits(Connection, EventEmitter) - -Connection.prototype.connect = function (port, host) { - var self = this - - this._connecting = true - this.stream.setNoDelay(true) - this.stream.connect(port, host) +const flushBuffer = serialize.flush() +const syncBuffer = serialize.sync() +const endBuffer = serialize.end() - this.stream.once('connect', function () { - if (self._keepAlive) { - self.stream.setKeepAlive(true, self._keepAliveInitialDelayMillis) - } - self.emit('connect') - }) +// TODO(bmc) support binary mode at some point +class Connection extends EventEmitter { + constructor(config) { + super() + config = config || {} + this.stream = config.stream || new net.Socket() + this._keepAlive = config.keepAlive + this._keepAliveInitialDelayMillis = config.keepAliveInitialDelayMillis + this.lastBuffer = false + this.parsedStatements = {} + this.ssl = config.ssl || false + this._ending = false + this._emitMessage = false + var self = this + this.on('newListener', function (eventName) { + if (eventName === 'message') { + self._emitMessage = true + } + }) + } - const reportStreamError = function (error) { - // errors about disconnections should be ignored during disconnect - if (self._ending && (error.code === 'ECONNRESET' || error.code === 'EPIPE')) { - return - } - self.emit('error', error) - } - this.stream.on('error', reportStreamError) - - this.stream.on('close', function () { - self.emit('end') - }) - - if (!this.ssl) { - return this.attachListeners(this.stream) - } - - this.stream.once('data', function (buffer) { - var responseCode = buffer.toString('utf8') - switch (responseCode) { - case 'S': // Server supports SSL connections, continue with a secure connection - break - case 'N': // Server does not support SSL connections - self.stream.end() - return self.emit('error', new Error('The server does not support SSL connections')) - default: - // Any other response byte, including 'E' (ErrorResponse) indicating a server error - self.stream.end() - return self.emit('error', new Error('There was an error establishing an SSL connection')) + connect(port, host) { + var self = this + + this._connecting = true + this.stream.setNoDelay(true) + this.stream.connect(port, host) + + this.stream.once('connect', function () { + if (self._keepAlive) { + self.stream.setKeepAlive(true, self._keepAliveInitialDelayMillis) + } + self.emit('connect') + }) + + const reportStreamError = function (error) { + // errors about disconnections should be ignored during disconnect + if (self._ending && (error.code === 'ECONNRESET' || error.code === 'EPIPE')) { + return + } + self.emit('error', error) } - var tls = require('tls') - const options = Object.assign( - { - socket: self.stream, - }, - self.ssl - ) - if (net.isIP(host) === 0) { - options.servername = host - } - self.stream = tls.connect(options) - self.attachListeners(self.stream) - self.stream.on('error', reportStreamError) + this.stream.on('error', reportStreamError) - self.emit('sslconnect') - }) -} + this.stream.on('close', function () { + self.emit('end') + }) -Connection.prototype.attachListeners = function (stream) { - stream.on('end', () => { - this.emit('end') - }) - parse(stream, (msg) => { - var eventName = msg.name === 'error' ? 'errorMessage' : msg.name - if (this._emitMessage) { - this.emit('message', msg) + if (!this.ssl) { + return this.attachListeners(this.stream) } - this.emit(eventName, msg) - }) -} -Connection.prototype.requestSsl = function () { - this.stream.write(serialize.requestSsl()) -} + this.stream.once('data', function (buffer) { + var responseCode = buffer.toString('utf8') + switch (responseCode) { + case 'S': // Server supports SSL connections, continue with a secure connection + break + case 'N': // Server does not support SSL connections + self.stream.end() + return self.emit('error', new Error('The server does not support SSL connections')) + default: + // Any other response byte, including 'E' (ErrorResponse) indicating a server error + self.stream.end() + return self.emit('error', new Error('There was an error establishing an SSL connection')) + } + var tls = require('tls') + const options = Object.assign( + { + socket: self.stream, + }, + self.ssl + ) + if (net.isIP(host) === 0) { + options.servername = host + } + self.stream = tls.connect(options) + self.attachListeners(self.stream) + self.stream.on('error', reportStreamError) + + self.emit('sslconnect') + }) + } -Connection.prototype.startup = function (config) { - this.stream.write(serialize.startup(config)) -} + attachListeners(stream) { + stream.on('end', () => { + this.emit('end') + }) + parse(stream, (msg) => { + var eventName = msg.name === 'error' ? 'errorMessage' : msg.name + if (this._emitMessage) { + this.emit('message', msg) + } + this.emit(eventName, msg) + }) + } -Connection.prototype.cancel = function (processID, secretKey) { - this._send(serialize.cancel(processID, secretKey)) -} + requestSsl() { + this.stream.write(serialize.requestSsl()) + } -Connection.prototype.password = function (password) { - this._send(serialize.password(password)) -} + startup(config) { + this.stream.write(serialize.startup(config)) + } -Connection.prototype.sendSASLInitialResponseMessage = function (mechanism, initialResponse) { - this._send(serialize.sendSASLInitialResponseMessage(mechanism, initialResponse)) -} + cancel(processID, secretKey) { + this._send(serialize.cancel(processID, secretKey)) + } -Connection.prototype.sendSCRAMClientFinalMessage = function (additionalData) { - this._send(serialize.sendSCRAMClientFinalMessage(additionalData)) -} + password(password) { + this._send(serialize.password(password)) + } -Connection.prototype._send = function (buffer) { - if (!this.stream.writable) { - return false + sendSASLInitialResponseMessage(mechanism, initialResponse) { + this._send(serialize.sendSASLInitialResponseMessage(mechanism, initialResponse)) } - return this.stream.write(buffer) -} -Connection.prototype.query = function (text) { - this._send(serialize.query(text)) -} + sendSCRAMClientFinalMessage(additionalData) { + this._send(serialize.sendSCRAMClientFinalMessage(additionalData)) + } -// send parse message -Connection.prototype.parse = function (query) { - this._send(serialize.parse(query)) -} + _send(buffer) { + if (!this.stream.writable) { + return false + } + return this.stream.write(buffer) + } -// send bind message -// "more" === true to buffer the message until flush() is called -Connection.prototype.bind = function (config) { - this._send(serialize.bind(config)) -} + query(text) { + this._send(serialize.query(text)) + } -// send execute message -// "more" === true to buffer the message until flush() is called -Connection.prototype.execute = function (config) { - this._send(serialize.execute(config)) -} + // send parse message + parse(query) { + this._send(serialize.parse(query)) + } -const flushBuffer = serialize.flush() -Connection.prototype.flush = function () { - if (this.stream.writable) { - this.stream.write(flushBuffer) + // send bind message + // "more" === true to buffer the message until flush() is called + bind(config) { + this._send(serialize.bind(config)) } -} -const syncBuffer = serialize.sync() -Connection.prototype.sync = function () { - this._ending = true - this._send(flushBuffer) - this._send(syncBuffer) -} + // send execute message + // "more" === true to buffer the message until flush() is called + execute(config) { + this._send(serialize.execute(config)) + } -const endBuffer = serialize.end() + flush() { + if (this.stream.writable) { + this.stream.write(flushBuffer) + } + } -Connection.prototype.end = function () { - // 0x58 = 'X' - this._ending = true - if (!this._connecting || !this.stream.writable) { - this.stream.end() - return + sync() { + this._ending = true + this._send(flushBuffer) + this._send(syncBuffer) } - return this.stream.write(endBuffer, () => { - this.stream.end() - }) -} -Connection.prototype.close = function (msg) { - this._send(serialize.close(msg)) -} + end() { + // 0x58 = 'X' + this._ending = true + if (!this._connecting || !this.stream.writable) { + this.stream.end() + return + } + return this.stream.write(endBuffer, () => { + this.stream.end() + }) + } -Connection.prototype.describe = function (msg) { - this._send(serialize.describe(msg)) -} + close(msg) { + this._send(serialize.close(msg)) + } -Connection.prototype.sendCopyFromChunk = function (chunk) { - this._send(serialize.copyData(chunk)) -} + describe(msg) { + this._send(serialize.describe(msg)) + } -Connection.prototype.endCopyFrom = function () { - this._send(serialize.copyDone()) -} + sendCopyFromChunk(chunk) { + this._send(serialize.copyData(chunk)) + } -Connection.prototype.sendCopyFail = function (msg) { - this._send(serialize.copyFail(msg)) + endCopyFrom() { + this._send(serialize.copyDone()) + } + + sendCopyFail(msg) { + this._send(serialize.copyFail(msg)) + } } module.exports = Connection From 9bf31060e162cd9f652ac63072a1dd6fd68e32f6 Mon Sep 17 00:00:00 2001 From: "Brian M. Carlson" Date: Wed, 15 Jul 2020 12:22:13 -0500 Subject: [PATCH 07/12] Cleanup some dead code --- packages/pg/lib/connection.js | 2 -- packages/pg/lib/query.js | 55 ++++++++++++++--------------------- 2 files changed, 22 insertions(+), 35 deletions(-) diff --git a/packages/pg/lib/connection.js b/packages/pg/lib/connection.js index 0aa3c0969..2142a401b 100644 --- a/packages/pg/lib/connection.js +++ b/packages/pg/lib/connection.js @@ -154,13 +154,11 @@ class Connection extends EventEmitter { } // send bind message - // "more" === true to buffer the message until flush() is called bind(config) { this._send(serialize.bind(config)) } // send execute message - // "more" === true to buffer the message until flush() is called execute(config) { this._send(serialize.execute(config)) } diff --git a/packages/pg/lib/query.js b/packages/pg/lib/query.js index 2392b710e..37098ac82 100644 --- a/packages/pg/lib/query.js +++ b/packages/pg/lib/query.js @@ -176,30 +176,26 @@ class Query extends EventEmitter { } _getRows(connection, rows) { - connection.execute( - { - portal: this.portal, - rows: rows, - }, - true - ) + connection.execute({ + portal: this.portal, + rows: rows, + }) connection.flush() } + // http://developer.postgresql.org/pgdocs/postgres/protocol-flow.html#PROTOCOL-FLOW-EXT-QUERY prepare(connection) { // prepared statements need sync to be called after each command // complete or when an error is encountered this.isPreparedStatement = true + // TODO refactor this poor encapsulation if (!this.hasBeenParsed(connection)) { - connection.parse( - { - text: this.text, - name: this.name, - types: this.types, - }, - true - ) + connection.parse({ + text: this.text, + name: this.name, + types: this.types, + }) } if (this.values) { @@ -211,24 +207,17 @@ class Query extends EventEmitter { } } - // http://developer.postgresql.org/pgdocs/postgres/protocol-flow.html#PROTOCOL-FLOW-EXT-QUERY - connection.bind( - { - portal: this.portal, - statement: this.name, - values: this.values, - binary: this.binary, - }, - true - ) - - connection.describe( - { - type: 'P', - name: this.portal || '', - }, - true - ) + connection.bind({ + portal: this.portal, + statement: this.name, + values: this.values, + binary: this.binary, + }) + + connection.describe({ + type: 'P', + name: this.portal || '', + }) this._getRows(connection, this.rows) } From 966278a5ccbacca762bbebff6e7d9f06c14b8a59 Mon Sep 17 00:00:00 2001 From: "Brian M. Carlson" Date: Wed, 15 Jul 2020 12:59:10 -0500 Subject: [PATCH 08/12] Instance bound methods are not supported in node 8 --- packages/pg/lib/client.js | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/packages/pg/lib/client.js b/packages/pg/lib/client.js index 1cac61f8b..600cf89fd 100644 --- a/packages/pg/lib/client.js +++ b/packages/pg/lib/client.js @@ -181,8 +181,8 @@ class Client extends EventEmitter { con.on('authenticationSASLContinue', this._handleAuthSASLContinue.bind(this)) con.on('authenticationSASLFinal', this._handleAuthSASLFinal.bind(this)) con.on('backendKeyData', this._handleBackendKeyData.bind(this)) - con.on('error', this._handleErrorEvent) - con.on('errorMessage', this._handleErrorMessage) + con.on('error', this._handleErrorEvent.bind(this)) + con.on('errorMessage', this._handleErrorMessage.bind(this)) con.on('readyForQuery', this._handleReadyForQuery.bind(this)) con.on('notice', this._handleNotice.bind(this)) con.on('rowDescription', this._handleRowDescription.bind(this)) @@ -295,7 +295,7 @@ class Client extends EventEmitter { // if we receieve an error event or error message // during the connection process we handle it here - _handleErrorWhileConnecting = (err) => { + _handleErrorWhileConnecting(err) { if (this._connectionError) { // TODO(bmc): this is swallowing errors - we shouldn't do this return @@ -311,7 +311,7 @@ class Client extends EventEmitter { // if we're connected and we receive an error event from the connection // this means the socket is dead - do a hard abort of all queries and emit // the socket error on the client as well - _handleErrorEvent = (err) => { + _handleErrorEvent(err) { if (this._connecting) { return this._handleErrorWhileConnecting(err) } @@ -321,7 +321,7 @@ class Client extends EventEmitter { } // handle error messages from the postgres backend - _handleErrorMessage = (msg) => { + _handleErrorMessage(msg) { if (this._connecting) { return this._handleErrorWhileConnecting(msg) } From 5425bc15d2c23caadaa2dcf30b636cde68bab8aa Mon Sep 17 00:00:00 2001 From: "Brian M. Carlson" Date: Wed, 15 Jul 2020 13:19:45 -0500 Subject: [PATCH 09/12] Fix untested pgpass code --- packages/pg/lib/client.js | 57 +++++++++++++++++++-------------------- 1 file changed, 28 insertions(+), 29 deletions(-) diff --git a/packages/pg/lib/client.js b/packages/pg/lib/client.js index 600cf89fd..842de57f9 100644 --- a/packages/pg/lib/client.js +++ b/packages/pg/lib/client.js @@ -199,36 +199,35 @@ class Client extends EventEmitter { // TODO(bmc): deprecate pgpass "built in" integration since this.password can be a function // it can be supplied by the user if required - this is a breaking change! _checkPgPass(cb) { - return function (msg) { - if (typeof this.password === 'function') { - this._Promise - .resolve() - .then(() => this.password()) - .then((pass) => { - if (pass !== undefined) { - if (typeof pass !== 'string') { - con.emit('error', new TypeError('Password must be a string')) - return - } - this.connectionParameters.password = this.password = pass - } else { - this.connectionParameters.password = this.password = null + const con = this.connection + if (typeof this.password === 'function') { + this._Promise + .resolve() + .then(() => this.password()) + .then((pass) => { + if (pass !== undefined) { + if (typeof pass !== 'string') { + con.emit('error', new TypeError('Password must be a string')) + return } - cb(msg) - }) - .catch((err) => { - con.emit('error', err) - }) - } else if (this.password !== null) { - cb(msg) - } else { - pgPass(this.connectionParameters, function (pass) { - if (undefined !== pass) { this.connectionParameters.password = this.password = pass + } else { + this.connectionParameters.password = this.password = null } - cb(msg) + cb() }) - } + .catch((err) => { + con.emit('error', err) + }) + } else if (this.password !== null) { + cb() + } else { + pgPass(this.connectionParameters, function (pass) { + if (undefined !== pass) { + this.connectionParameters.password = this.password = pass + } + cb() + }) } } @@ -239,14 +238,14 @@ class Client extends EventEmitter { } _handleAuthMD5Password(msg) { - this._checkPgPass((msg) => { + this._checkPgPass(() => { const hashedPassword = utils.postgresMd5PasswordHash(this.user, this.password, msg.salt) this.connection.password(hashedPassword) }) } - _handleAuthSASL(msg) { - this._checkPgPass((msg) => { + _handleAuthSASL() { + this._checkPgPass(() => { this.saslSession = sasl.startSession(msg.mechanisms) const con = this.connection con.sendSASLInitialResponseMessage(saslSession.mechanism, saslSession.response) From fdf13bac3476bcba581605cbb61028017d583fb2 Mon Sep 17 00:00:00 2001 From: "Brian M. Carlson" Date: Wed, 15 Jul 2020 13:30:25 -0500 Subject: [PATCH 10/12] Fix msg not being passed for SASL --- packages/pg/lib/client.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/pg/lib/client.js b/packages/pg/lib/client.js index 842de57f9..cf465c44b 100644 --- a/packages/pg/lib/client.js +++ b/packages/pg/lib/client.js @@ -244,7 +244,7 @@ class Client extends EventEmitter { }) } - _handleAuthSASL() { + _handleAuthSASL(msg) { this._checkPgPass(() => { this.saslSession = sasl.startSession(msg.mechanisms) const con = this.connection From 66d32c6f3fdf74d24e50bb1409d9ddab689e0aec Mon Sep 17 00:00:00 2001 From: "Brian M. Carlson" Date: Wed, 15 Jul 2020 13:38:34 -0500 Subject: [PATCH 11/12] Fix more SASL. Thank God for tests. --- packages/pg/lib/client.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/pg/lib/client.js b/packages/pg/lib/client.js index cf465c44b..ec1dd47c2 100644 --- a/packages/pg/lib/client.js +++ b/packages/pg/lib/client.js @@ -248,7 +248,7 @@ class Client extends EventEmitter { this._checkPgPass(() => { this.saslSession = sasl.startSession(msg.mechanisms) const con = this.connection - con.sendSASLInitialResponseMessage(saslSession.mechanism, saslSession.response) + con.sendSASLInitialResponseMessage(this.saslSession.mechanism, this.saslSession.response) }) } From 9ba4ebb80314fcc3dd752bdbaad472c79d9ffa50 Mon Sep 17 00:00:00 2001 From: "Brian M. Carlson" Date: Wed, 15 Jul 2020 13:53:12 -0500 Subject: [PATCH 12/12] Fix SASL again --- packages/pg/lib/client.js | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/packages/pg/lib/client.js b/packages/pg/lib/client.js index ec1dd47c2..bc91924e6 100644 --- a/packages/pg/lib/client.js +++ b/packages/pg/lib/client.js @@ -247,15 +247,13 @@ class Client extends EventEmitter { _handleAuthSASL(msg) { this._checkPgPass(() => { this.saslSession = sasl.startSession(msg.mechanisms) - const con = this.connection - con.sendSASLInitialResponseMessage(this.saslSession.mechanism, this.saslSession.response) + this.connection.sendSASLInitialResponseMessage(this.saslSession.mechanism, this.saslSession.response) }) } _handleAuthSASLContinue(msg) { - const { saslSession } = this - sasl.continueSession(saslSession, this.password, msg.data) - con.sendSCRAMClientFinalMessage(saslSession.response) + sasl.continueSession(this.saslSession, this.password, msg.data) + this.connection.sendSCRAMClientFinalMessage(this.saslSession.response) } _handleAuthSASLFinal(msg) {