From ab33346cbdb19df89d1c0ca8b7691379089eb4f1 Mon Sep 17 00:00:00 2001 From: Joe Adams Date: Wed, 29 Mar 2023 21:25:27 -0400 Subject: [PATCH] Add the instance struct to handle connections The intent is to use the instance struct to hold the connection to the database as well as metadata about the instance. Currently this metadata only includes the version of postgres for the instance which can be used in the collectors to decide what query to run. In the future this could hold more metadata but for now it keeps the Collector interface arguments to a reasonable number. Signed-off-by: Joe Adams --- collector/collector.go | 18 ++---- collector/instance.go | 85 +++++++++++++++++++++++++ collector/pg_database.go | 4 +- collector/pg_database_test.go | 4 +- collector/pg_postmaster.go | 4 +- collector/pg_postmaster_test.go | 4 +- collector/pg_process_idle.go | 4 +- collector/pg_replication_slot.go | 4 +- collector/pg_replication_slot_test.go | 8 ++- collector/pg_stat_bgwriter.go | 4 +- collector/pg_stat_bgwriter_test.go | 4 +- collector/pg_stat_database.go | 3 +- collector/pg_stat_statements.go | 4 +- collector/pg_stat_statements_test.go | 4 +- collector/pg_stat_user_tables.go | 4 +- collector/pg_stat_user_tables_test.go | 4 +- collector/pg_statio_user_tables.go | 4 +- collector/pg_statio_user_tables_test.go | 4 +- collector/probe.go | 13 ++-- 19 files changed, 139 insertions(+), 44 deletions(-) create mode 100644 collector/instance.go diff --git a/collector/collector.go b/collector/collector.go index d50e1e7..c1bf2af 100644 --- a/collector/collector.go +++ b/collector/collector.go @@ -15,7 +15,6 @@ package collector import ( "context" - "database/sql" "errors" "fmt" "sync" @@ -59,7 +58,7 @@ var ( ) type Collector interface { - Update(ctx context.Context, db *sql.DB, ch chan<- prometheus.Metric) error + Update(ctx context.Context, instance *instance, ch chan<- prometheus.Metric) error } type collectorConfig struct { @@ -92,7 +91,7 @@ type PostgresCollector struct { Collectors map[string]Collector logger log.Logger - db *sql.DB + instance *instance } type Option func(*PostgresCollector) error @@ -149,14 +148,11 @@ func NewPostgresCollector(logger log.Logger, excludeDatabases []string, dsn stri return nil, errors.New("empty dsn") } - db, err := sql.Open("postgres", dsn) + instance, err := newInstance(dsn) if err != nil { return nil, err } - db.SetMaxOpenConns(1) - db.SetMaxIdleConns(1) - - p.db = db + p.instance = instance return p, nil } @@ -174,16 +170,16 @@ func (p PostgresCollector) Collect(ch chan<- prometheus.Metric) { wg.Add(len(p.Collectors)) for name, c := range p.Collectors { go func(name string, c Collector) { - execute(ctx, name, c, p.db, ch, p.logger) + execute(ctx, name, c, p.instance, ch, p.logger) wg.Done() }(name, c) } wg.Wait() } -func execute(ctx context.Context, name string, c Collector, db *sql.DB, ch chan<- prometheus.Metric, logger log.Logger) { +func execute(ctx context.Context, name string, c Collector, instance *instance, ch chan<- prometheus.Metric, logger log.Logger) { begin := time.Now() - err := c.Update(ctx, db, ch) + err := c.Update(ctx, instance, ch) duration := time.Since(begin) var success float64 diff --git a/collector/instance.go b/collector/instance.go new file mode 100644 index 0000000..9b2bbf4 --- /dev/null +++ b/collector/instance.go @@ -0,0 +1,85 @@ +// Copyright 2023 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package collector + +import ( + "database/sql" + "fmt" + "regexp" + + "github.com/blang/semver/v4" +) + +type instance struct { + db *sql.DB + version semver.Version +} + +func newInstance(dsn string) (*instance, error) { + i := &instance{} + db, err := sql.Open("postgres", dsn) + if err != nil { + return nil, err + } + db.SetMaxOpenConns(1) + db.SetMaxIdleConns(1) + i.db = db + + version, err := queryVersion(db) + if err != nil { + db.Close() + return nil, err + } + + i.version = version + + return i, nil +} + +func (i *instance) getDB() *sql.DB { + return i.db +} + +func (i *instance) Close() error { + return i.db.Close() +} + +// Regex used to get the "short-version" from the postgres version field. +// The result of SELECT version() is something like "PostgreSQL 9.6.2 on x86_64-pc-linux-gnu, compiled by gcc (GCC) 6.2.1 20160830, 64-bit" +var versionRegex = regexp.MustCompile(`^\w+ ((\d+)(\.\d+)?(\.\d+)?)`) +var serverVersionRegex = regexp.MustCompile(`^((\d+)(\.\d+)?(\.\d+)?)`) + +func queryVersion(db *sql.DB) (semver.Version, error) { + var version string + err := db.QueryRow("SELECT version();").Scan(&version) + if err != nil { + return semver.Version{}, err + } + submatches := versionRegex.FindStringSubmatch(version) + if len(submatches) > 1 { + return semver.ParseTolerant(submatches[1]) + } + + // We could also try to parse the version from the server_version field. + // This is of the format 13.3 (Debian 13.3-1.pgdg100+1) + err = db.QueryRow("SHOW server_version;").Scan(&version) + if err != nil { + return semver.Version{}, err + } + submatches = serverVersionRegex.FindStringSubmatch(version) + if len(submatches) > 1 { + return semver.ParseTolerant(submatches[1]) + } + return semver.Version{}, fmt.Errorf("could not parse version from %q", version) +} diff --git a/collector/pg_database.go b/collector/pg_database.go index 661f84c..a4ea50d 100644 --- a/collector/pg_database.go +++ b/collector/pg_database.go @@ -15,7 +15,6 @@ package collector import ( "context" - "database/sql" "github.com/go-kit/log" "github.com/prometheus/client_golang/prometheus" @@ -66,7 +65,8 @@ var ( // each database individually. This is because we can't filter the // list of databases in the query because the list of excluded // databases is dynamic. -func (c PGDatabaseCollector) Update(ctx context.Context, db *sql.DB, ch chan<- prometheus.Metric) error { +func (c PGDatabaseCollector) Update(ctx context.Context, instance *instance, ch chan<- prometheus.Metric) error { + db := instance.getDB() // Query the list of databases rows, err := db.QueryContext(ctx, pgDatabaseQuery, diff --git a/collector/pg_database_test.go b/collector/pg_database_test.go index bb108bb..058a6d2 100644 --- a/collector/pg_database_test.go +++ b/collector/pg_database_test.go @@ -29,6 +29,8 @@ func TestPGDatabaseCollector(t *testing.T) { } defer db.Close() + inst := &instance{db: db} + mock.ExpectQuery(sanitizeQuery(pgDatabaseQuery)).WillReturnRows(sqlmock.NewRows([]string{"datname"}). AddRow("postgres")) @@ -39,7 +41,7 @@ func TestPGDatabaseCollector(t *testing.T) { go func() { defer close(ch) c := PGDatabaseCollector{} - if err := c.Update(context.Background(), db, ch); err != nil { + if err := c.Update(context.Background(), inst, ch); err != nil { t.Errorf("Error calling PGDatabaseCollector.Update: %s", err) } }() diff --git a/collector/pg_postmaster.go b/collector/pg_postmaster.go index 4a0cec6..eae82d5 100644 --- a/collector/pg_postmaster.go +++ b/collector/pg_postmaster.go @@ -15,7 +15,6 @@ package collector import ( "context" - "database/sql" "github.com/prometheus/client_golang/prometheus" ) @@ -47,7 +46,8 @@ var ( pgPostmasterQuery = "SELECT pg_postmaster_start_time from pg_postmaster_start_time();" ) -func (c *PGPostmasterCollector) Update(ctx context.Context, db *sql.DB, ch chan<- prometheus.Metric) error { +func (c *PGPostmasterCollector) Update(ctx context.Context, instance *instance, ch chan<- prometheus.Metric) error { + db := instance.getDB() row := db.QueryRowContext(ctx, pgPostmasterQuery) diff --git a/collector/pg_postmaster_test.go b/collector/pg_postmaster_test.go index 9b93a5c..c40fe03 100644 --- a/collector/pg_postmaster_test.go +++ b/collector/pg_postmaster_test.go @@ -29,6 +29,8 @@ func TestPgPostmasterCollector(t *testing.T) { } defer db.Close() + inst := &instance{db: db} + mock.ExpectQuery(sanitizeQuery(pgPostmasterQuery)).WillReturnRows(sqlmock.NewRows([]string{"pg_postmaster_start_time"}). AddRow(1685739904)) @@ -37,7 +39,7 @@ func TestPgPostmasterCollector(t *testing.T) { defer close(ch) c := PGPostmasterCollector{} - if err := c.Update(context.Background(), db, ch); err != nil { + if err := c.Update(context.Background(), inst, ch); err != nil { t.Errorf("Error calling PGPostmasterCollector.Update: %s", err) } }() diff --git a/collector/pg_process_idle.go b/collector/pg_process_idle.go index 8ee65a4..0624497 100644 --- a/collector/pg_process_idle.go +++ b/collector/pg_process_idle.go @@ -15,7 +15,6 @@ package collector import ( "context" - "database/sql" "github.com/go-kit/log" "github.com/prometheus/client_golang/prometheus" @@ -42,7 +41,8 @@ var pgProcessIdleSeconds = prometheus.NewDesc( prometheus.Labels{}, ) -func (PGProcessIdleCollector) Update(ctx context.Context, db *sql.DB, ch chan<- prometheus.Metric) error { +func (PGProcessIdleCollector) Update(ctx context.Context, instance *instance, ch chan<- prometheus.Metric) error { + db := instance.getDB() row := db.QueryRowContext(ctx, `WITH metrics AS ( diff --git a/collector/pg_replication_slot.go b/collector/pg_replication_slot.go index 8f105ff..4278923 100644 --- a/collector/pg_replication_slot.go +++ b/collector/pg_replication_slot.go @@ -15,7 +15,6 @@ package collector import ( "context" - "database/sql" "github.com/go-kit/log" "github.com/prometheus/client_golang/prometheus" @@ -73,7 +72,8 @@ var ( pg_replication_slots;` ) -func (PGReplicationSlotCollector) Update(ctx context.Context, db *sql.DB, ch chan<- prometheus.Metric) error { +func (PGReplicationSlotCollector) Update(ctx context.Context, instance *instance, ch chan<- prometheus.Metric) error { + db := instance.getDB() rows, err := db.QueryContext(ctx, pgReplicationSlotQuery) if err != nil { diff --git a/collector/pg_replication_slot_test.go b/collector/pg_replication_slot_test.go index 53bafaf..cb25b75 100644 --- a/collector/pg_replication_slot_test.go +++ b/collector/pg_replication_slot_test.go @@ -29,6 +29,8 @@ func TestPgReplicationSlotCollectorActive(t *testing.T) { } defer db.Close() + inst := &instance{db: db} + columns := []string{"slot_name", "current_wal_lsn", "confirmed_flush_lsn", "active"} rows := sqlmock.NewRows(columns). AddRow("test_slot", 5, 3, true) @@ -39,7 +41,7 @@ func TestPgReplicationSlotCollectorActive(t *testing.T) { defer close(ch) c := PGReplicationSlotCollector{} - if err := c.Update(context.Background(), db, ch); err != nil { + if err := c.Update(context.Background(), inst, ch); err != nil { t.Errorf("Error calling PGPostmasterCollector.Update: %s", err) } }() @@ -68,6 +70,8 @@ func TestPgReplicationSlotCollectorInActive(t *testing.T) { } defer db.Close() + inst := &instance{db: db} + columns := []string{"slot_name", "current_wal_lsn", "confirmed_flush_lsn", "active"} rows := sqlmock.NewRows(columns). AddRow("test_slot", 6, 12, false) @@ -78,7 +82,7 @@ func TestPgReplicationSlotCollectorInActive(t *testing.T) { defer close(ch) c := PGReplicationSlotCollector{} - if err := c.Update(context.Background(), db, ch); err != nil { + if err := c.Update(context.Background(), inst, ch); err != nil { t.Errorf("Error calling PGReplicationSlotCollector.Update: %s", err) } }() diff --git a/collector/pg_stat_bgwriter.go b/collector/pg_stat_bgwriter.go index 5daf606..2bdef8d 100644 --- a/collector/pg_stat_bgwriter.go +++ b/collector/pg_stat_bgwriter.go @@ -15,7 +15,6 @@ package collector import ( "context" - "database/sql" "time" "github.com/prometheus/client_golang/prometheus" @@ -117,7 +116,8 @@ var ( FROM pg_stat_bgwriter;` ) -func (PGStatBGWriterCollector) Update(ctx context.Context, db *sql.DB, ch chan<- prometheus.Metric) error { +func (PGStatBGWriterCollector) Update(ctx context.Context, instance *instance, ch chan<- prometheus.Metric) error { + db := instance.getDB() row := db.QueryRowContext(ctx, statBGWriterQuery) diff --git a/collector/pg_stat_bgwriter_test.go b/collector/pg_stat_bgwriter_test.go index 54f625c..11f55f6 100644 --- a/collector/pg_stat_bgwriter_test.go +++ b/collector/pg_stat_bgwriter_test.go @@ -30,6 +30,8 @@ func TestPGStatBGWriterCollector(t *testing.T) { } defer db.Close() + inst := &instance{db: db} + columns := []string{ "checkpoints_timed", "checkpoints_req", @@ -57,7 +59,7 @@ func TestPGStatBGWriterCollector(t *testing.T) { defer close(ch) c := PGStatBGWriterCollector{} - if err := c.Update(context.Background(), db, ch); err != nil { + if err := c.Update(context.Background(), inst, ch); err != nil { t.Errorf("Error calling PGStatBGWriterCollector.Update: %s", err) } }() diff --git a/collector/pg_stat_database.go b/collector/pg_stat_database.go index 346ed9e..bb39a84 100644 --- a/collector/pg_stat_database.go +++ b/collector/pg_stat_database.go @@ -204,7 +204,8 @@ var ( ) ) -func (PGStatDatabaseCollector) Update(ctx context.Context, db *sql.DB, ch chan<- prometheus.Metric) error { +func (PGStatDatabaseCollector) Update(ctx context.Context, instance *instance, ch chan<- prometheus.Metric) error { + db := instance.getDB() rows, err := db.QueryContext(ctx, `SELECT datid diff --git a/collector/pg_stat_statements.go b/collector/pg_stat_statements.go index 23e1f15..eb629c3 100644 --- a/collector/pg_stat_statements.go +++ b/collector/pg_stat_statements.go @@ -15,7 +15,6 @@ package collector import ( "context" - "database/sql" "github.com/go-kit/log" "github.com/prometheus/client_golang/prometheus" @@ -92,7 +91,8 @@ var ( LIMIT 100;` ) -func (PGStatStatementsCollector) Update(ctx context.Context, db *sql.DB, ch chan<- prometheus.Metric) error { +func (PGStatStatementsCollector) Update(ctx context.Context, instance *instance, ch chan<- prometheus.Metric) error { + db := instance.getDB() rows, err := db.QueryContext(ctx, pgStatStatementsQuery) diff --git a/collector/pg_stat_statements_test.go b/collector/pg_stat_statements_test.go index a5c5cab..241699a 100644 --- a/collector/pg_stat_statements_test.go +++ b/collector/pg_stat_statements_test.go @@ -29,6 +29,8 @@ func TestPGStateStatementsCollector(t *testing.T) { } defer db.Close() + inst := &instance{db: db} + columns := []string{"user", "datname", "queryid", "calls_total", "seconds_total", "rows_total", "block_read_seconds_total", "block_write_seconds_total"} rows := sqlmock.NewRows(columns). AddRow("postgres", "postgres", 1500, 5, 0.4, 100, 0.1, 0.2) @@ -39,7 +41,7 @@ func TestPGStateStatementsCollector(t *testing.T) { defer close(ch) c := PGStatStatementsCollector{} - if err := c.Update(context.Background(), db, ch); err != nil { + if err := c.Update(context.Background(), inst, ch); err != nil { t.Errorf("Error calling PGStatStatementsCollector.Update: %s", err) } }() diff --git a/collector/pg_stat_user_tables.go b/collector/pg_stat_user_tables.go index 05aced9..48ae96e 100644 --- a/collector/pg_stat_user_tables.go +++ b/collector/pg_stat_user_tables.go @@ -15,7 +15,6 @@ package collector import ( "context" - "database/sql" "time" "github.com/go-kit/log" @@ -179,7 +178,8 @@ var ( pg_stat_user_tables` ) -func (c *PGStatUserTablesCollector) Update(ctx context.Context, db *sql.DB, ch chan<- prometheus.Metric) error { +func (c *PGStatUserTablesCollector) Update(ctx context.Context, instance *instance, ch chan<- prometheus.Metric) error { + db := instance.getDB() rows, err := db.QueryContext(ctx, statUserTablesQuery) diff --git a/collector/pg_stat_user_tables_test.go b/collector/pg_stat_user_tables_test.go index 29b5d15..8bb9bc3 100644 --- a/collector/pg_stat_user_tables_test.go +++ b/collector/pg_stat_user_tables_test.go @@ -30,6 +30,8 @@ func TestPGStatUserTablesCollector(t *testing.T) { } defer db.Close() + inst := &instance{db: db} + lastVacuumTime, err := time.Parse("2006-01-02Z", "2023-06-02Z") if err != nil { t.Fatalf("Error parsing vacuum time: %s", err) @@ -99,7 +101,7 @@ func TestPGStatUserTablesCollector(t *testing.T) { defer close(ch) c := PGStatUserTablesCollector{} - if err := c.Update(context.Background(), db, ch); err != nil { + if err := c.Update(context.Background(), inst, ch); err != nil { t.Errorf("Error calling PGStatUserTablesCollector.Update: %s", err) } }() diff --git a/collector/pg_statio_user_tables.go b/collector/pg_statio_user_tables.go index 043433d..03d5416 100644 --- a/collector/pg_statio_user_tables.go +++ b/collector/pg_statio_user_tables.go @@ -15,7 +15,6 @@ package collector import ( "context" - "database/sql" "github.com/go-kit/log" "github.com/prometheus/client_golang/prometheus" @@ -100,7 +99,8 @@ var ( FROM pg_statio_user_tables` ) -func (PGStatIOUserTablesCollector) Update(ctx context.Context, db *sql.DB, ch chan<- prometheus.Metric) error { +func (PGStatIOUserTablesCollector) Update(ctx context.Context, instance *instance, ch chan<- prometheus.Metric) error { + db := instance.getDB() rows, err := db.QueryContext(ctx, statioUserTablesQuery) diff --git a/collector/pg_statio_user_tables_test.go b/collector/pg_statio_user_tables_test.go index 0a7174d..d57cab9 100644 --- a/collector/pg_statio_user_tables_test.go +++ b/collector/pg_statio_user_tables_test.go @@ -29,6 +29,8 @@ func TestPGStatIOUserTablesCollector(t *testing.T) { } defer db.Close() + inst := &instance{db: db} + columns := []string{ "datname", "schemaname", @@ -60,7 +62,7 @@ func TestPGStatIOUserTablesCollector(t *testing.T) { defer close(ch) c := PGStatIOUserTablesCollector{} - if err := c.Update(context.Background(), db, ch); err != nil { + if err := c.Update(context.Background(), inst, ch); err != nil { t.Errorf("Error calling PGStatIOUserTablesCollector.Update: %s", err) } }() diff --git a/collector/probe.go b/collector/probe.go index 9044c40..834c651 100644 --- a/collector/probe.go +++ b/collector/probe.go @@ -15,7 +15,6 @@ package collector import ( "context" - "database/sql" "sync" "github.com/go-kit/log" @@ -27,7 +26,7 @@ type ProbeCollector struct { registry *prometheus.Registry collectors map[string]Collector logger log.Logger - db *sql.DB + instance *instance } func NewProbeCollector(logger log.Logger, excludeDatabases []string, registry *prometheus.Registry, dsn config.DSN) (*ProbeCollector, error) { @@ -58,18 +57,16 @@ func NewProbeCollector(logger log.Logger, excludeDatabases []string, registry *p } } - db, err := sql.Open("postgres", dsn.GetConnectionString()) + instance, err := newInstance(dsn.GetConnectionString()) if err != nil { return nil, err } - db.SetMaxOpenConns(1) - db.SetMaxIdleConns(1) return &ProbeCollector{ registry: registry, collectors: collectors, logger: logger, - db: db, + instance: instance, }, nil } @@ -81,7 +78,7 @@ func (pc *ProbeCollector) Collect(ch chan<- prometheus.Metric) { wg.Add(len(pc.collectors)) for name, c := range pc.collectors { go func(name string, c Collector) { - execute(context.TODO(), name, c, pc.db, ch, pc.logger) + execute(context.TODO(), name, c, pc.instance, ch, pc.logger) wg.Done() }(name, c) } @@ -89,5 +86,5 @@ func (pc *ProbeCollector) Collect(ch chan<- prometheus.Metric) { } func (pc *ProbeCollector) Close() error { - return pc.db.Close() + return pc.instance.Close() }