Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

### Added

#### 2.0.0

- Don't collect host metrics if a query/batch observer is not provided (CASSGO-90)

#### 2.0.0-rc1

- Support vector type (CASSGO-11)
Expand Down
2 changes: 1 addition & 1 deletion cassandra_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2068,7 +2068,7 @@ func TestQueryStats(t *testing.T) {
t.Fatal("expected at least 1 attempt, but got 0")
}
if iter.Latency() <= 0 {
t.Fatalf("expected latency to be greater than 0, but got %v instead.", iter.Latency())
t.Fatalf("expected latency to be > 0, but got %v instead.", iter.Latency())
}
}
}
Expand Down
7 changes: 6 additions & 1 deletion conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -1665,7 +1665,12 @@ func (c *Conn) executeQuery(ctx context.Context, q *internalQuery) *Iter {
newQry := new(internalQuery)
*newQry = *q
newQry.pageState = copyBytes(x.meta.pagingState)
newQry.metrics = &queryMetrics{m: make(map[string]*hostMetrics)}
newQry.metrics = &queryMetrics{}
if newQry.qryOpts.observer != nil {
newQry.hostMetricsManager = newHostMetricsManager()
} else {
newQry.hostMetricsManager = emptyHostMetricsManager
}

iter.next = &nextIter{
q: newQry,
Expand Down
24 changes: 12 additions & 12 deletions conn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -450,28 +450,28 @@ func TestQueryMultinodeWithMetrics(t *testing.T) {
if err == nil {
t.Fatalf("expected error")
}
totalLatency := int64(0)
totalAttempts := int64(0)

for i, ip := range addresses {
host := &HostInfo{connectAddress: net.ParseIP(ip)}
queryMetric := iter.metrics.hostMetrics(host)
observedMetrics := observer.GetMetrics(host)

requests := int(atomic.LoadInt64(&nodes[i].nKillReq))
hostAttempts := queryMetric.Attempts
if requests != hostAttempts {
t.Fatalf("expected requests %v to match query attempts %v", requests, hostAttempts)
}

if hostAttempts != observedMetrics.Attempts {
t.Fatalf("expected observed attempts %v to match query attempts %v on host %v", observedMetrics.Attempts, hostAttempts, ip)
if requests != observedMetrics.Attempts {
t.Fatalf("expected observed attempts %v to match server requests %v on host %v", observedMetrics.Attempts, requests, ip)
}

hostLatency := queryMetric.TotalLatency
observedLatency := observedMetrics.TotalLatency
if hostLatency != observedLatency {
t.Fatalf("expected observed latency %v to match query latency %v on host %v", observedLatency, hostLatency, ip)
}
totalLatency += observedLatency
totalAttempts += int64(observedMetrics.Attempts)
}

observedLatency := totalLatency / totalAttempts
if observedLatency != iter.Latency() {
t.Fatalf("expected observed latency %v (%v/%v) to match query latency %v", observedLatency, totalLatency, totalAttempts, iter.Latency())
}

// the query will only be attempted once, but is being retried
attempts := iter.Attempts()
if attempts != rt.NumRetries {
Expand Down
7 changes: 4 additions & 3 deletions control.go
Original file line number Diff line number Diff line change
Expand Up @@ -557,7 +557,7 @@ func (c *controlConn) withConnHost(fn func(*connHost) *Iter) *Iter {
return fn(ch)
}

return newErrIter(errNoControl, newQueryMetrics(), "", nil, nil)
return newErrIter(errNoControl, &queryMetrics{}, "", nil, nil)
}

func (c *controlConn) withConn(fn func(*Conn) *Iter) *Iter {
Expand All @@ -582,7 +582,8 @@ func (c *controlConn) query(statement string, values ...interface{}) (iter *Iter
newLogFieldString("statement", statement), newLogFieldError("err", iter.err))
}

iter.metrics.attempt(1, 0, c.getConn().host, false)
qry.metrics.attempt(0)
qry.hostMetricsManager.attempt(0, c.getConn().host)
if iter.err == nil || !c.retry.Attempt(qry) {
break
}
Expand All @@ -593,7 +594,7 @@ func (c *controlConn) query(statement string, values ...interface{}) (iter *Iter

func (c *controlConn) awaitSchemaAgreement() error {
return c.withConn(func(conn *Conn) *Iter {
return newErrIter(conn.awaitSchemaAgreement(context.TODO()), newQueryMetrics(), "", nil, nil)
return newErrIter(conn.awaitSchemaAgreement(context.TODO()), &queryMetrics{}, "", nil, nil)
}).err
}

Expand Down
6 changes: 4 additions & 2 deletions policies_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,8 @@ func TestSimpleRetryPolicy(t *testing.T) {
}

for _, c := range cases {
q.metrics = preFilledQueryMetrics(map[string]*hostMetrics{"127.0.0.1": {Attempts: c.attempts}})
q.metrics = &queryMetrics{totalAttempts: int64(c.attempts)}
q.hostMetricsManager = preFilledHostMetricsMetricsManager(map[string]*hostMetrics{"127.0.0.1": {Attempts: c.attempts}})
if c.allow && !rt.Attempt(q) {
t.Fatalf("should allow retry after %d attempts", c.attempts)
}
Expand Down Expand Up @@ -345,7 +346,8 @@ func TestDowngradingConsistencyRetryPolicy(t *testing.T) {
}

for _, c := range cases {
q.metrics = preFilledQueryMetrics(map[string]*hostMetrics{"127.0.0.1": {Attempts: c.attempts}})
q.metrics = &queryMetrics{totalAttempts: int64(c.attempts)}
q.hostMetricsManager = preFilledHostMetricsMetricsManager(map[string]*hostMetrics{"127.0.0.1": {Attempts: c.attempts}})
if c.retryType != rt.GetRetryType(c.err) {
t.Fatalf("retry type should be %v", c.retryType)
}
Expand Down
79 changes: 49 additions & 30 deletions query_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -334,14 +334,15 @@ func newQueryOptions(q *Query, ctx context.Context) *queryOptions {
}

type internalQuery struct {
originalQuery *Query
qryOpts *queryOptions
pageState []byte
metrics *queryMetrics
conn *Conn
consistency uint32
session *Session
routingInfo *queryRoutingInfo
originalQuery *Query
qryOpts *queryOptions
pageState []byte
conn *Conn
consistency uint32
session *Session
routingInfo *queryRoutingInfo
metrics *queryMetrics
hostMetricsManager hostMetricsManager
}

func newInternalQuery(q *Query, ctx context.Context) *internalQuery {
Expand All @@ -351,15 +352,22 @@ func newInternalQuery(q *Query, ctx context.Context) *internalQuery {
newPageState = make([]byte, len(pageState))
copy(newPageState, pageState)
}
var hostMetricsMgr hostMetricsManager
if q.observer != nil {
hostMetricsMgr = newHostMetricsManager()
} else {
hostMetricsMgr = emptyHostMetricsManager
}
return &internalQuery{
originalQuery: q,
qryOpts: newQueryOptions(q, ctx),
metrics: &queryMetrics{m: make(map[string]*hostMetrics)},
consistency: uint32(q.initialConsistency),
pageState: newPageState,
conn: nil,
session: q.session,
routingInfo: &queryRoutingInfo{},
originalQuery: q,
qryOpts: newQueryOptions(q, ctx),
metrics: &queryMetrics{},
hostMetricsManager: hostMetricsMgr,
consistency: uint32(q.initialConsistency),
pageState: newPageState,
conn: nil,
session: q.session,
routingInfo: &queryRoutingInfo{},
}
}

Expand All @@ -370,9 +378,10 @@ func (q *internalQuery) Attempts() int {

func (q *internalQuery) attempt(keyspace string, end, start time.Time, iter *Iter, host *HostInfo) {
latency := end.Sub(start)
attempt, metricsForHost := q.metrics.attempt(1, latency, host, q.qryOpts.observer != nil)
attempt := q.metrics.attempt(latency)

if q.qryOpts.observer != nil {
metricsForHost := q.hostMetricsManager.attempt(latency, host)
q.qryOpts.observer.ObserveQuery(q.qryOpts.context, ObservedQuery{
Keyspace: keyspace,
Statement: q.qryOpts.stmt,
Expand Down Expand Up @@ -546,22 +555,30 @@ func newBatchOptions(b *Batch, ctx context.Context) *batchOptions {
}

type internalBatch struct {
originalBatch *Batch
batchOpts *batchOptions
metrics *queryMetrics
consistency uint32
routingInfo *queryRoutingInfo
session *Session
originalBatch *Batch
batchOpts *batchOptions
consistency uint32
routingInfo *queryRoutingInfo
session *Session
metrics *queryMetrics
hostMetricsManager hostMetricsManager
}

func newInternalBatch(batch *Batch, ctx context.Context) *internalBatch {
var hostMetricsMgr hostMetricsManager
if batch.observer != nil {
hostMetricsMgr = newHostMetricsManager()
} else {
hostMetricsMgr = emptyHostMetricsManager
}
return &internalBatch{
originalBatch: batch,
batchOpts: newBatchOptions(batch, ctx),
metrics: &queryMetrics{m: make(map[string]*hostMetrics)},
routingInfo: &queryRoutingInfo{},
session: batch.session,
consistency: uint32(batch.GetConsistency()),
originalBatch: batch,
batchOpts: newBatchOptions(batch, ctx),
routingInfo: &queryRoutingInfo{},
session: batch.session,
consistency: uint32(batch.GetConsistency()),
metrics: &queryMetrics{},
hostMetricsManager: hostMetricsMgr,
}
}

Expand All @@ -572,12 +589,14 @@ func (b *internalBatch) Attempts() int {

func (b *internalBatch) attempt(keyspace string, end, start time.Time, iter *Iter, host *HostInfo) {
latency := end.Sub(start)
attempt, metricsForHost := b.metrics.attempt(1, latency, host, b.batchOpts.observer != nil)
attempt := b.metrics.attempt(latency)

if b.batchOpts.observer == nil {
return
}

metricsForHost := b.hostMetricsManager.attempt(latency, host)

statements := make([]string, len(b.batchOpts.entries))
values := make([][]interface{}, len(b.batchOpts.entries))

Expand Down
Loading