Merge pull request #785 from sysadmind/collector-instance

Add the instance struct to handle connections
This commit is contained in:
Ben Kochie
2023-06-22 08:18:12 +02:00
committed by GitHub
19 changed files with 139 additions and 44 deletions

View File

@@ -15,7 +15,6 @@ package collector
import ( import (
"context" "context"
"database/sql"
"errors" "errors"
"fmt" "fmt"
"sync" "sync"
@@ -59,7 +58,7 @@ var (
) )
type Collector interface { type Collector interface {
Update(ctx context.Context, db *sql.DB, ch chan<- prometheus.Metric) error Update(ctx context.Context, instance *instance, ch chan<- prometheus.Metric) error
} }
type collectorConfig struct { type collectorConfig struct {
@@ -92,7 +91,7 @@ type PostgresCollector struct {
Collectors map[string]Collector Collectors map[string]Collector
logger log.Logger logger log.Logger
db *sql.DB instance *instance
} }
type Option func(*PostgresCollector) error type Option func(*PostgresCollector) error
@@ -149,14 +148,11 @@ func NewPostgresCollector(logger log.Logger, excludeDatabases []string, dsn stri
return nil, errors.New("empty dsn") return nil, errors.New("empty dsn")
} }
db, err := sql.Open("postgres", dsn) instance, err := newInstance(dsn)
if err != nil { if err != nil {
return nil, err return nil, err
} }
db.SetMaxOpenConns(1) p.instance = instance
db.SetMaxIdleConns(1)
p.db = db
return p, nil return p, nil
} }
@@ -174,16 +170,16 @@ func (p PostgresCollector) Collect(ch chan<- prometheus.Metric) {
wg.Add(len(p.Collectors)) wg.Add(len(p.Collectors))
for name, c := range p.Collectors { for name, c := range p.Collectors {
go func(name string, c Collector) { go func(name string, c Collector) {
execute(ctx, name, c, p.db, ch, p.logger) execute(ctx, name, c, p.instance, ch, p.logger)
wg.Done() wg.Done()
}(name, c) }(name, c)
} }
wg.Wait() wg.Wait()
} }
func execute(ctx context.Context, name string, c Collector, db *sql.DB, ch chan<- prometheus.Metric, logger log.Logger) { func execute(ctx context.Context, name string, c Collector, instance *instance, ch chan<- prometheus.Metric, logger log.Logger) {
begin := time.Now() begin := time.Now()
err := c.Update(ctx, db, ch) err := c.Update(ctx, instance, ch)
duration := time.Since(begin) duration := time.Since(begin)
var success float64 var success float64

85
collector/instance.go Normal file
View File

@@ -0,0 +1,85 @@
// Copyright 2023 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package collector
import (
"database/sql"
"fmt"
"regexp"
"github.com/blang/semver/v4"
)
type instance struct {
db *sql.DB
version semver.Version
}
func newInstance(dsn string) (*instance, error) {
i := &instance{}
db, err := sql.Open("postgres", dsn)
if err != nil {
return nil, err
}
db.SetMaxOpenConns(1)
db.SetMaxIdleConns(1)
i.db = db
version, err := queryVersion(db)
if err != nil {
db.Close()
return nil, err
}
i.version = version
return i, nil
}
func (i *instance) getDB() *sql.DB {
return i.db
}
func (i *instance) Close() error {
return i.db.Close()
}
// Regex used to get the "short-version" from the postgres version field.
// The result of SELECT version() is something like "PostgreSQL 9.6.2 on x86_64-pc-linux-gnu, compiled by gcc (GCC) 6.2.1 20160830, 64-bit"
var versionRegex = regexp.MustCompile(`^\w+ ((\d+)(\.\d+)?(\.\d+)?)`)
var serverVersionRegex = regexp.MustCompile(`^((\d+)(\.\d+)?(\.\d+)?)`)
func queryVersion(db *sql.DB) (semver.Version, error) {
var version string
err := db.QueryRow("SELECT version();").Scan(&version)
if err != nil {
return semver.Version{}, err
}
submatches := versionRegex.FindStringSubmatch(version)
if len(submatches) > 1 {
return semver.ParseTolerant(submatches[1])
}
// We could also try to parse the version from the server_version field.
// This is of the format 13.3 (Debian 13.3-1.pgdg100+1)
err = db.QueryRow("SHOW server_version;").Scan(&version)
if err != nil {
return semver.Version{}, err
}
submatches = serverVersionRegex.FindStringSubmatch(version)
if len(submatches) > 1 {
return semver.ParseTolerant(submatches[1])
}
return semver.Version{}, fmt.Errorf("could not parse version from %q", version)
}

View File

@@ -15,7 +15,6 @@ package collector
import ( import (
"context" "context"
"database/sql"
"github.com/go-kit/log" "github.com/go-kit/log"
"github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus"
@@ -66,7 +65,8 @@ var (
// each database individually. This is because we can't filter the // each database individually. This is because we can't filter the
// list of databases in the query because the list of excluded // list of databases in the query because the list of excluded
// databases is dynamic. // databases is dynamic.
func (c PGDatabaseCollector) Update(ctx context.Context, db *sql.DB, ch chan<- prometheus.Metric) error { func (c PGDatabaseCollector) Update(ctx context.Context, instance *instance, ch chan<- prometheus.Metric) error {
db := instance.getDB()
// Query the list of databases // Query the list of databases
rows, err := db.QueryContext(ctx, rows, err := db.QueryContext(ctx,
pgDatabaseQuery, pgDatabaseQuery,

View File

@@ -29,6 +29,8 @@ func TestPGDatabaseCollector(t *testing.T) {
} }
defer db.Close() defer db.Close()
inst := &instance{db: db}
mock.ExpectQuery(sanitizeQuery(pgDatabaseQuery)).WillReturnRows(sqlmock.NewRows([]string{"datname"}). mock.ExpectQuery(sanitizeQuery(pgDatabaseQuery)).WillReturnRows(sqlmock.NewRows([]string{"datname"}).
AddRow("postgres")) AddRow("postgres"))
@@ -39,7 +41,7 @@ func TestPGDatabaseCollector(t *testing.T) {
go func() { go func() {
defer close(ch) defer close(ch)
c := PGDatabaseCollector{} c := PGDatabaseCollector{}
if err := c.Update(context.Background(), db, ch); err != nil { if err := c.Update(context.Background(), inst, ch); err != nil {
t.Errorf("Error calling PGDatabaseCollector.Update: %s", err) t.Errorf("Error calling PGDatabaseCollector.Update: %s", err)
} }
}() }()

View File

@@ -15,7 +15,6 @@ package collector
import ( import (
"context" "context"
"database/sql"
"github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus"
) )
@@ -47,7 +46,8 @@ var (
pgPostmasterQuery = "SELECT pg_postmaster_start_time from pg_postmaster_start_time();" pgPostmasterQuery = "SELECT pg_postmaster_start_time from pg_postmaster_start_time();"
) )
func (c *PGPostmasterCollector) Update(ctx context.Context, db *sql.DB, ch chan<- prometheus.Metric) error { func (c *PGPostmasterCollector) Update(ctx context.Context, instance *instance, ch chan<- prometheus.Metric) error {
db := instance.getDB()
row := db.QueryRowContext(ctx, row := db.QueryRowContext(ctx,
pgPostmasterQuery) pgPostmasterQuery)

View File

@@ -29,6 +29,8 @@ func TestPgPostmasterCollector(t *testing.T) {
} }
defer db.Close() defer db.Close()
inst := &instance{db: db}
mock.ExpectQuery(sanitizeQuery(pgPostmasterQuery)).WillReturnRows(sqlmock.NewRows([]string{"pg_postmaster_start_time"}). mock.ExpectQuery(sanitizeQuery(pgPostmasterQuery)).WillReturnRows(sqlmock.NewRows([]string{"pg_postmaster_start_time"}).
AddRow(1685739904)) AddRow(1685739904))
@@ -37,7 +39,7 @@ func TestPgPostmasterCollector(t *testing.T) {
defer close(ch) defer close(ch)
c := PGPostmasterCollector{} c := PGPostmasterCollector{}
if err := c.Update(context.Background(), db, ch); err != nil { if err := c.Update(context.Background(), inst, ch); err != nil {
t.Errorf("Error calling PGPostmasterCollector.Update: %s", err) t.Errorf("Error calling PGPostmasterCollector.Update: %s", err)
} }
}() }()

View File

@@ -15,7 +15,6 @@ package collector
import ( import (
"context" "context"
"database/sql"
"github.com/go-kit/log" "github.com/go-kit/log"
"github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus"
@@ -42,7 +41,8 @@ var pgProcessIdleSeconds = prometheus.NewDesc(
prometheus.Labels{}, prometheus.Labels{},
) )
func (PGProcessIdleCollector) Update(ctx context.Context, db *sql.DB, ch chan<- prometheus.Metric) error { func (PGProcessIdleCollector) Update(ctx context.Context, instance *instance, ch chan<- prometheus.Metric) error {
db := instance.getDB()
row := db.QueryRowContext(ctx, row := db.QueryRowContext(ctx,
`WITH `WITH
metrics AS ( metrics AS (

View File

@@ -15,7 +15,6 @@ package collector
import ( import (
"context" "context"
"database/sql"
"github.com/go-kit/log" "github.com/go-kit/log"
"github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus"
@@ -73,7 +72,8 @@ var (
pg_replication_slots;` pg_replication_slots;`
) )
func (PGReplicationSlotCollector) Update(ctx context.Context, db *sql.DB, ch chan<- prometheus.Metric) error { func (PGReplicationSlotCollector) Update(ctx context.Context, instance *instance, ch chan<- prometheus.Metric) error {
db := instance.getDB()
rows, err := db.QueryContext(ctx, rows, err := db.QueryContext(ctx,
pgReplicationSlotQuery) pgReplicationSlotQuery)
if err != nil { if err != nil {

View File

@@ -29,6 +29,8 @@ func TestPgReplicationSlotCollectorActive(t *testing.T) {
} }
defer db.Close() defer db.Close()
inst := &instance{db: db}
columns := []string{"slot_name", "current_wal_lsn", "confirmed_flush_lsn", "active"} columns := []string{"slot_name", "current_wal_lsn", "confirmed_flush_lsn", "active"}
rows := sqlmock.NewRows(columns). rows := sqlmock.NewRows(columns).
AddRow("test_slot", 5, 3, true) AddRow("test_slot", 5, 3, true)
@@ -39,7 +41,7 @@ func TestPgReplicationSlotCollectorActive(t *testing.T) {
defer close(ch) defer close(ch)
c := PGReplicationSlotCollector{} c := PGReplicationSlotCollector{}
if err := c.Update(context.Background(), db, ch); err != nil { if err := c.Update(context.Background(), inst, ch); err != nil {
t.Errorf("Error calling PGPostmasterCollector.Update: %s", err) t.Errorf("Error calling PGPostmasterCollector.Update: %s", err)
} }
}() }()
@@ -68,6 +70,8 @@ func TestPgReplicationSlotCollectorInActive(t *testing.T) {
} }
defer db.Close() defer db.Close()
inst := &instance{db: db}
columns := []string{"slot_name", "current_wal_lsn", "confirmed_flush_lsn", "active"} columns := []string{"slot_name", "current_wal_lsn", "confirmed_flush_lsn", "active"}
rows := sqlmock.NewRows(columns). rows := sqlmock.NewRows(columns).
AddRow("test_slot", 6, 12, false) AddRow("test_slot", 6, 12, false)
@@ -78,7 +82,7 @@ func TestPgReplicationSlotCollectorInActive(t *testing.T) {
defer close(ch) defer close(ch)
c := PGReplicationSlotCollector{} c := PGReplicationSlotCollector{}
if err := c.Update(context.Background(), db, ch); err != nil { if err := c.Update(context.Background(), inst, ch); err != nil {
t.Errorf("Error calling PGReplicationSlotCollector.Update: %s", err) t.Errorf("Error calling PGReplicationSlotCollector.Update: %s", err)
} }
}() }()

View File

@@ -15,7 +15,6 @@ package collector
import ( import (
"context" "context"
"database/sql"
"time" "time"
"github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus"
@@ -117,7 +116,8 @@ var (
FROM pg_stat_bgwriter;` FROM pg_stat_bgwriter;`
) )
func (PGStatBGWriterCollector) Update(ctx context.Context, db *sql.DB, ch chan<- prometheus.Metric) error { func (PGStatBGWriterCollector) Update(ctx context.Context, instance *instance, ch chan<- prometheus.Metric) error {
db := instance.getDB()
row := db.QueryRowContext(ctx, row := db.QueryRowContext(ctx,
statBGWriterQuery) statBGWriterQuery)

View File

@@ -30,6 +30,8 @@ func TestPGStatBGWriterCollector(t *testing.T) {
} }
defer db.Close() defer db.Close()
inst := &instance{db: db}
columns := []string{ columns := []string{
"checkpoints_timed", "checkpoints_timed",
"checkpoints_req", "checkpoints_req",
@@ -57,7 +59,7 @@ func TestPGStatBGWriterCollector(t *testing.T) {
defer close(ch) defer close(ch)
c := PGStatBGWriterCollector{} c := PGStatBGWriterCollector{}
if err := c.Update(context.Background(), db, ch); err != nil { if err := c.Update(context.Background(), inst, ch); err != nil {
t.Errorf("Error calling PGStatBGWriterCollector.Update: %s", err) t.Errorf("Error calling PGStatBGWriterCollector.Update: %s", err)
} }
}() }()

View File

@@ -204,7 +204,8 @@ var (
) )
) )
func (PGStatDatabaseCollector) Update(ctx context.Context, db *sql.DB, ch chan<- prometheus.Metric) error { func (PGStatDatabaseCollector) Update(ctx context.Context, instance *instance, ch chan<- prometheus.Metric) error {
db := instance.getDB()
rows, err := db.QueryContext(ctx, rows, err := db.QueryContext(ctx,
`SELECT `SELECT
datid datid

View File

@@ -15,7 +15,6 @@ package collector
import ( import (
"context" "context"
"database/sql"
"github.com/go-kit/log" "github.com/go-kit/log"
"github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus"
@@ -92,7 +91,8 @@ var (
LIMIT 100;` LIMIT 100;`
) )
func (PGStatStatementsCollector) Update(ctx context.Context, db *sql.DB, ch chan<- prometheus.Metric) error { func (PGStatStatementsCollector) Update(ctx context.Context, instance *instance, ch chan<- prometheus.Metric) error {
db := instance.getDB()
rows, err := db.QueryContext(ctx, rows, err := db.QueryContext(ctx,
pgStatStatementsQuery) pgStatStatementsQuery)

View File

@@ -29,6 +29,8 @@ func TestPGStateStatementsCollector(t *testing.T) {
} }
defer db.Close() defer db.Close()
inst := &instance{db: db}
columns := []string{"user", "datname", "queryid", "calls_total", "seconds_total", "rows_total", "block_read_seconds_total", "block_write_seconds_total"} columns := []string{"user", "datname", "queryid", "calls_total", "seconds_total", "rows_total", "block_read_seconds_total", "block_write_seconds_total"}
rows := sqlmock.NewRows(columns). rows := sqlmock.NewRows(columns).
AddRow("postgres", "postgres", 1500, 5, 0.4, 100, 0.1, 0.2) AddRow("postgres", "postgres", 1500, 5, 0.4, 100, 0.1, 0.2)
@@ -39,7 +41,7 @@ func TestPGStateStatementsCollector(t *testing.T) {
defer close(ch) defer close(ch)
c := PGStatStatementsCollector{} c := PGStatStatementsCollector{}
if err := c.Update(context.Background(), db, ch); err != nil { if err := c.Update(context.Background(), inst, ch); err != nil {
t.Errorf("Error calling PGStatStatementsCollector.Update: %s", err) t.Errorf("Error calling PGStatStatementsCollector.Update: %s", err)
} }
}() }()

View File

@@ -15,7 +15,6 @@ package collector
import ( import (
"context" "context"
"database/sql"
"time" "time"
"github.com/go-kit/log" "github.com/go-kit/log"
@@ -179,7 +178,8 @@ var (
pg_stat_user_tables` pg_stat_user_tables`
) )
func (c *PGStatUserTablesCollector) Update(ctx context.Context, db *sql.DB, ch chan<- prometheus.Metric) error { func (c *PGStatUserTablesCollector) Update(ctx context.Context, instance *instance, ch chan<- prometheus.Metric) error {
db := instance.getDB()
rows, err := db.QueryContext(ctx, rows, err := db.QueryContext(ctx,
statUserTablesQuery) statUserTablesQuery)

View File

@@ -30,6 +30,8 @@ func TestPGStatUserTablesCollector(t *testing.T) {
} }
defer db.Close() defer db.Close()
inst := &instance{db: db}
lastVacuumTime, err := time.Parse("2006-01-02Z", "2023-06-02Z") lastVacuumTime, err := time.Parse("2006-01-02Z", "2023-06-02Z")
if err != nil { if err != nil {
t.Fatalf("Error parsing vacuum time: %s", err) t.Fatalf("Error parsing vacuum time: %s", err)
@@ -99,7 +101,7 @@ func TestPGStatUserTablesCollector(t *testing.T) {
defer close(ch) defer close(ch)
c := PGStatUserTablesCollector{} c := PGStatUserTablesCollector{}
if err := c.Update(context.Background(), db, ch); err != nil { if err := c.Update(context.Background(), inst, ch); err != nil {
t.Errorf("Error calling PGStatUserTablesCollector.Update: %s", err) t.Errorf("Error calling PGStatUserTablesCollector.Update: %s", err)
} }
}() }()

View File

@@ -15,7 +15,6 @@ package collector
import ( import (
"context" "context"
"database/sql"
"github.com/go-kit/log" "github.com/go-kit/log"
"github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus"
@@ -100,7 +99,8 @@ var (
FROM pg_statio_user_tables` FROM pg_statio_user_tables`
) )
func (PGStatIOUserTablesCollector) Update(ctx context.Context, db *sql.DB, ch chan<- prometheus.Metric) error { func (PGStatIOUserTablesCollector) Update(ctx context.Context, instance *instance, ch chan<- prometheus.Metric) error {
db := instance.getDB()
rows, err := db.QueryContext(ctx, rows, err := db.QueryContext(ctx,
statioUserTablesQuery) statioUserTablesQuery)

View File

@@ -29,6 +29,8 @@ func TestPGStatIOUserTablesCollector(t *testing.T) {
} }
defer db.Close() defer db.Close()
inst := &instance{db: db}
columns := []string{ columns := []string{
"datname", "datname",
"schemaname", "schemaname",
@@ -60,7 +62,7 @@ func TestPGStatIOUserTablesCollector(t *testing.T) {
defer close(ch) defer close(ch)
c := PGStatIOUserTablesCollector{} c := PGStatIOUserTablesCollector{}
if err := c.Update(context.Background(), db, ch); err != nil { if err := c.Update(context.Background(), inst, ch); err != nil {
t.Errorf("Error calling PGStatIOUserTablesCollector.Update: %s", err) t.Errorf("Error calling PGStatIOUserTablesCollector.Update: %s", err)
} }
}() }()

View File

@@ -15,7 +15,6 @@ package collector
import ( import (
"context" "context"
"database/sql"
"sync" "sync"
"github.com/go-kit/log" "github.com/go-kit/log"
@@ -27,7 +26,7 @@ type ProbeCollector struct {
registry *prometheus.Registry registry *prometheus.Registry
collectors map[string]Collector collectors map[string]Collector
logger log.Logger logger log.Logger
db *sql.DB instance *instance
} }
func NewProbeCollector(logger log.Logger, excludeDatabases []string, registry *prometheus.Registry, dsn config.DSN) (*ProbeCollector, error) { func NewProbeCollector(logger log.Logger, excludeDatabases []string, registry *prometheus.Registry, dsn config.DSN) (*ProbeCollector, error) {
@@ -58,18 +57,16 @@ func NewProbeCollector(logger log.Logger, excludeDatabases []string, registry *p
} }
} }
db, err := sql.Open("postgres", dsn.GetConnectionString()) instance, err := newInstance(dsn.GetConnectionString())
if err != nil { if err != nil {
return nil, err return nil, err
} }
db.SetMaxOpenConns(1)
db.SetMaxIdleConns(1)
return &ProbeCollector{ return &ProbeCollector{
registry: registry, registry: registry,
collectors: collectors, collectors: collectors,
logger: logger, logger: logger,
db: db, instance: instance,
}, nil }, nil
} }
@@ -81,7 +78,7 @@ func (pc *ProbeCollector) Collect(ch chan<- prometheus.Metric) {
wg.Add(len(pc.collectors)) wg.Add(len(pc.collectors))
for name, c := range pc.collectors { for name, c := range pc.collectors {
go func(name string, c Collector) { go func(name string, c Collector) {
execute(context.TODO(), name, c, pc.db, ch, pc.logger) execute(context.TODO(), name, c, pc.instance, ch, pc.logger)
wg.Done() wg.Done()
}(name, c) }(name, c)
} }
@@ -89,5 +86,5 @@ func (pc *ProbeCollector) Collect(ch chan<- prometheus.Metric) {
} }
func (pc *ProbeCollector) Close() error { func (pc *ProbeCollector) Close() error {
return pc.db.Close() return pc.instance.Close()
} }