Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions carbon/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,7 @@ func (app *App) Start() (err error) {
receiver.DropPast(uint32(conf.Tcp.DropPast.Value().Seconds())),
receiver.DropLongerThan(conf.Tcp.DropLongerThan),
receiver.ReadTimeout(uint32(conf.Tcp.ReadTimeout.Value().Seconds())),
receiver.BlacklistRegex(conf.Common.BlacklistRegex),
)

if err != nil {
Expand All @@ -274,6 +275,7 @@ func (app *App) Start() (err error) {
receiver.DropFuture(uint32(conf.Udp.DropFuture.Value().Seconds())),
receiver.DropPast(uint32(conf.Udp.DropPast.Value().Seconds())),
receiver.DropLongerThan(conf.Udp.DropLongerThan),
receiver.BlacklistRegex(conf.Common.BlacklistRegex),
)

if err != nil {
Expand All @@ -292,6 +294,7 @@ func (app *App) Start() (err error) {
receiver.DropFuture(uint32(conf.Pickle.DropFuture.Value().Seconds())),
receiver.DropPast(uint32(conf.Pickle.DropPast.Value().Seconds())),
receiver.DropLongerThan(conf.Pickle.DropLongerThan),
receiver.BlacklistRegex(conf.Common.BlacklistRegex),
)

if err != nil {
Expand All @@ -309,6 +312,7 @@ func (app *App) Start() (err error) {
receiver.DropFuture(uint32(conf.Grpc.DropFuture.Value().Seconds())),
receiver.DropPast(uint32(conf.Grpc.DropPast.Value().Seconds())),
receiver.DropLongerThan(conf.Grpc.DropLongerThan),
receiver.BlacklistRegex(conf.Common.BlacklistRegex),
)

if err != nil {
Expand All @@ -326,6 +330,7 @@ func (app *App) Start() (err error) {
receiver.DropFuture(uint32(conf.Prometheus.DropFuture.Value().Seconds())),
receiver.DropPast(uint32(conf.Prometheus.DropPast.Value().Seconds())),
receiver.DropLongerThan(conf.Prometheus.DropLongerThan),
receiver.BlacklistRegex(conf.Common.BlacklistRegex),
)

if err != nil {
Expand All @@ -344,6 +349,7 @@ func (app *App) Start() (err error) {
receiver.DropPast(uint32(conf.TelegrafHttpJson.DropPast.Value().Seconds())),
receiver.DropLongerThan(conf.TelegrafHttpJson.DropLongerThan),
receiver.ConcatChar(conf.TelegrafHttpJson.Concat),
receiver.BlacklistRegex(conf.Common.BlacklistRegex),
)

if err != nil {
Expand Down
8 changes: 8 additions & 0 deletions carbon/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"bytes"
"fmt"
"io/ioutil"
"regexp"
"strings"
"time"

Expand All @@ -25,6 +26,7 @@ type commonConfig struct {
MetricEndpoint string `toml:"metric-endpoint"`
MaxCPU int `toml:"max-cpu"`
Enabled bool `toml:"enabled"`
BlacklistRegex string `toml:"blacklist-regex"`
}

type clickhouseConfig struct {
Expand Down Expand Up @@ -279,6 +281,12 @@ func ReadConfig(filename string, exactConfig bool) (*Config, error) {
}
}

if cfg.Common.BlacklistRegex != "" {
if _, err := regexp.Compile(cfg.Common.BlacklistRegex); err != nil {
return nil, fmt.Errorf("invalid regex in blacklist-regex option: %s", err.Error())
}
}

if cfg.Logging == nil {
cfg.Logging = make([]zapwriter.Config, 0)
}
Expand Down
31 changes: 22 additions & 9 deletions receiver/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package receiver
import (
"fmt"
"net/http"
"regexp"
"sort"
"sync"
"sync/atomic"
Expand All @@ -18,15 +19,16 @@ const droppedListSize = 1000
type Base struct {
stop.Struct
stat struct {
samplesReceived uint64 // atomic
messagesReceived uint64 // atomic
metricsReceived uint64 // atomic
errors uint64 // atomic
active int64 // atomic
incompleteReceived uint64 // atomic
futureDropped uint64 // atomic
pastDropped uint64 // atomic
tooLongDropped uint64 // atomic
samplesReceived uint64 // atomic
messagesReceived uint64 // atomic
metricsReceived uint64 // atomic
errors uint64 // atomic
active int64 // atomic
incompleteReceived uint64 // atomic
futureDropped uint64 // atomic
pastDropped uint64 // atomic
tooLongDropped uint64 // atomic
blacklistRegexDropped uint64 // atomic
}
droppedList [droppedListSize]string
droppedListNext int
Expand All @@ -36,6 +38,7 @@ type Base struct {
dropPastSeconds uint32
dropTooLongLimit uint16
readTimeoutSeconds uint32
blacklistRegex *regexp.Regexp
writeChan chan *RowBinary.WriteBuffer
logger *zap.Logger
Tags tags.TagConfig
Expand Down Expand Up @@ -85,6 +88,14 @@ func (base *Base) isDropMetricNameTooLong(name string) bool {
return false
}

func (base *Base) isMatchedByBlacklistRegex(name []byte) bool {
if base.blacklistRegex != nil && base.blacklistRegex.Match(name) {
atomic.AddUint64(&base.stat.blacklistRegexDropped, 1)
return true
}
return false
}

func (base *Base) DroppedHandler(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "text/plain")

Expand Down Expand Up @@ -143,6 +154,8 @@ func (base *Base) SendStat(send func(metric string, value float64), fields ...st
sendUint64Counter(send, f, &base.stat.pastDropped)
case "tooLongDropped":
sendUint64Counter(send, f, &base.stat.tooLongDropped)
case "blacklistRegexDropped":
sendUint64Counter(send, f, &base.stat.blacklistRegexDropped)
case "errors":
sendUint64Counter(send, f, &base.stat.errors)
case "active":
Expand Down
2 changes: 1 addition & 1 deletion receiver/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func (g *GRPC) Addr() net.Addr {
}

func (g *GRPC) Stat(send func(metric string, value float64)) {
g.SendStat(send, "metricsReceived", "errors", "futureDropped", "pastDropped", "tooLongDropped")
g.SendStat(send, "metricsReceived", "errors", "futureDropped", "pastDropped", "tooLongDropped", "blacklistRegexDropped")
}

// Listen bind port. Receive messages and send to out channel
Expand Down
2 changes: 1 addition & 1 deletion receiver/pickle.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func (rcv *Pickle) Addr() net.Addr {

func (rcv *Pickle) Stat(send func(metric string, value float64)) {
rcv.SendStat(send, "metricsReceived", "messagesReceived", "errors", "active", "futureDropped", "pastDropped",
"tooLongDropped")
"tooLongDropped", "blacklistRegexDropped")
}

func (rcv *Pickle) HandleConnection(conn net.Conn) {
Expand Down
4 changes: 4 additions & 0 deletions receiver/plain.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,10 @@ func (base *Base) PlainParseLine(p []byte, now uint32, buf *tags.GraphiteBuf) ([
i3--
}

if base.isMatchedByBlacklistRegex(p[:i1]) {
return nil, 0, 0, errors.New("metric name matched by blacklist regex: '" + unsafeString(p) + "'")
}

value, err := strconv.ParseFloat(unsafeString(p[i1+1:i2]), 64)
if err != nil || math.IsNaN(value) {
return nil, 0, 0, errors.New("bad message: '" + unsafeString(p) + "'")
Expand Down
127 changes: 127 additions & 0 deletions receiver/plain_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package receiver
import (
"context"
"fmt"
"regexp"
"sync"
"testing"
"time"
Expand Down Expand Up @@ -177,6 +178,8 @@ func TestPlainParseLine(t *testing.T) {
{"metric.name;tag=value;k=v 42.15 1422642189\r\n", "metric.name?k=v&tag=value", 42.15, 1422642189},
{"metric..name 42.15 -1\n", "metric.name", 42.15, now},
{"cpu.loadavg;env=test2;host=host1;env=test 21.4 1422642189\n", "cpu.loadavg?env=test&host=host1", 21.4, 1422642189},
{"cpu.loadavg~ 21.4 1422642189\n", "cpu.loadavg~", 21.4, 1422642189},
{"cpu.loadavg~;env=test2;host=host1;env=test 21.4 1422642189\n", "cpu.loadavg~?env=test&host=host1", 21.4, 1422642189},
}

base := &Base{}
Expand All @@ -202,4 +205,128 @@ func TestPlainParseLine(t *testing.T) {
}
}
}

tableWithValidation := [](struct {
b string
name string
value float64
timestamp uint32
}){
{b: "42"},
{b: ""},
{b: "\n"},
{b: "metric..name 42 \n"},
{b: "metric..name 42"},
{b: "metric.name 42 a1422642189\n"},
{b: "metric.name 42a 1422642189\n"},
{b: "metric.name NaN 1422642189\n"},
{b: "metric.name 42 NaN\n"},
{"metric.name -42.76 1422642189\n", "metric.name", -42.76, 1422642189},
{"metric.name 42.15 1422642189\n", "metric.name", 42.15, 1422642189},
{"metric..name 42.15 1422642189\n", "metric.name", 42.15, 1422642189},
{"metric...name 42.15 1422642189\n", "metric.name", 42.15, 1422642189},
{"metric.name 42.15 1422642189\r\n", "metric.name", 42.15, 1422642189},
{"metric.name;tag=value;k=v 42.15 1422642189\r\n", "metric.name?k=v&tag=value", 42.15, 1422642189},
{"metric..name 42.15 -1\n", "metric.name", 42.15, now},
{"cpu.loadavg;env=test2;host=host1;env=test 21.4 1422642189\n", "cpu.loadavg?env=test&host=host1", 21.4, 1422642189},

// Additional test cases for validation
// Test invalid characters in metric names
{b: "metric@name 42.15 1422642189\n"},
{b: "metric#name 42.15 1422642189\n"},
{b: "metric$name 42.15 1422642189\n"},
{b: "metric%name 42.15 1422642189\n"},
{b: "metric&name 42.15 1422642189\n"},
{b: "metric*name 42.15 1422642189\n"},
{b: "metric!name 42.15 1422642189\n"},
{b: "metric name 42.15 1422642189\n"}, // space in metric name
{b: "metric\tname 42.15 1422642189\n"}, // tab in metric name
{b: "metric[name] 42.15 1422642189\n"},
{b: "metric{name} 42.15 1422642189\n"},
{b: "metric(name) 42.15 1422642189\n"},
{b: "metric/name 42.15 1422642189\n"},
{b: "metric\\name 42.15 1422642189\n"},
{b: "metric|name 42.15 1422642189\n"},
{b: "metric?name 42.15 1422642189\n"},
{b: "metric<name> 42.15 1422642189\n"},
{b: "metric'name' 42.15 1422642189\n"},
{b: "metric\"name\" 42.15 1422642189\n"},

// Test valid characters that should pass
{"metric-name 42.15 1422642189\n", "metric-name", 42.15, 1422642189},
{"metric_name 42.15 1422642189\n", "metric_name", 42.15, 1422642189},
{"metric:name 42.15 1422642189\n", "metric:name", 42.15, 1422642189},
{"metric.sub.name 42.15 1422642189\n", "metric.sub.name", 42.15, 1422642189},
{"metric-123_test:data 42.15 1422642189\n", "metric-123_test:data", 42.15, 1422642189},

// Test invalid characters in tags
{b: "metric.name;tag@=value 42.15 1422642189\n"},
{b: "metric.name;tag=val@ue 42.15 1422642189\n"},
{b: "metric.name;t ag=value 42.15 1422642189\n"},
{b: "metric.name;tag=val ue 42.15 1422642189\n"},
{b: "metric.name;tag#key=value 42.15 1422642189\n"},
{b: "metric.name;tag=value! 42.15 1422642189\n"},
{b: "metric.name;tag=value;key=val*ue 42.15 1422642189\n"},
{b: "metric.name;tag=value;k ey=value 42.15 1422642189\n"},
{b: "metric.name;tag=value;key=val\tue 42.15 1422642189\n"},
{b: "metric.name;tag=value;key=val\nue 42.15 1422642189\n"},

// Test valid tags that should pass
{"metric.name;env=prod 42.15 1422642189\n", "metric.name?env=prod", 42.15, 1422642189},
{"metric.name;env=prod;region=us-east-1 42.15 1422642189\n", "metric.name?env=prod&region=us-east-1", 42.15, 1422642189},
{"metric.name;tag-name=tag-value 42.15 1422642189\n", "metric.name?tag-name=tag-value", 42.15, 1422642189},
{"metric.name;tag_name=tag_value 42.15 1422642189\n", "metric.name?tag_name=tag_value", 42.15, 1422642189},
{"metric.name;tag:name=tag:value 42.15 1422642189\n", "metric.name?tag%3Aname=tag%3Avalue", 42.15, 1422642189},
{"metric.name;tag.name=tag.value 42.15 1422642189\n", "metric.name?tag.name=tag.value", 42.15, 1422642189},

// Test edge cases with multiple invalid characters
{b: "metric@#$%name 42.15 1422642189\n"},
{b: "metric.name;tag@#=value$% 42.15 1422642189\n"},
{b: "met!ric.na@me;ta#g=val$ue 42.15 1422642189\n"},

// Test unicode characters (should fail validation)
{b: "metric.名前 42.15 1422642189\n"},
{b: "metric.name;tag=値 42.15 1422642189\n"},
{b: "metric.name;标签=value 42.15 1422642189\n"},
{b: "метрика.name 42.15 1422642189\n"},

// Test empty tag keys/values
{b: "metric.name;=value 42.15 1422642189\n"},
{b: "metric.name;= 42.15 1422642189\n"},

// Test metrics with numbers
{"metric123 42.15 1422642189\n", "metric123", 42.15, 1422642189},
{"123metric 42.15 1422642189\n", "123metric", 42.15, 1422642189},
{"123 42.15 1422642189\n", "123", 42.15, 1422642189},

// Test metrics with only valid special characters
{"metric-_.:name 42.15 1422642189\n", "metric-_.:name", 42.15, 1422642189},
{"metric.name;tag-_.:key=tag-_.:value 42.15 1422642189\n", "metric.name?tag-_.%3Akey=tag-_.%3Avalue", 42.15, 1422642189},

// Additional tests for colon encoding
{"host:port:metric 42.15 1422642189\n", "host:port:metric", 42.15, 1422642189},
{"metric.name;service:port=web:8080 42.15 1422642189\n", "metric.name?service%3Aport=web%3A8080", 42.15, 1422642189},
{"app:service:metric;env=prod:primary 42.15 1422642189\n", "app:service:metric?env=prod%3Aprimary", 42.15, 1422642189},
}

baseWithValidation := &Base{blacklistRegex: regexp.MustCompile(`[^a-zA-Z0-9.;\-_:=]{1}`)}
for _, p := range tableWithValidation {
name, value, timestamp, err := baseWithValidation.PlainParseLine([]byte(p.b), now, &tagBuf)
if p.name == "" {
// expected error
if err == nil {
t.Fatal("error expected")
}
} else {
if string(name) != p.name {
t.Fatalf("%#v != %#v", string(name), p.name)
}
if value != p.value {
t.Fatalf("%#v != %#v", value, p.value)
}
if timestamp != p.timestamp {
t.Fatalf("%d != %d", timestamp, p.timestamp)
}
}
}
}
2 changes: 1 addition & 1 deletion receiver/prometheus.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ func (rcv *PrometheusRemoteWrite) Addr() net.Addr {
}

func (rcv *PrometheusRemoteWrite) Stat(send func(metric string, value float64)) {
rcv.SendStat(send, "samplesReceived", "errors", "futureDropped", "pastDropped", "tooLongDropped")
rcv.SendStat(send, "samplesReceived", "errors", "futureDropped", "pastDropped", "tooLongDropped", "blacklistRegexDropped")
}

// Listen bind port. Receive messages and send to out channel
Expand Down
13 changes: 13 additions & 0 deletions receiver/receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"net"
"net/http"
"net/url"
"regexp"
"strings"

"github.com/lomik/carbon-clickhouse/helper/RowBinary"
Expand Down Expand Up @@ -90,6 +91,18 @@ func ConcatChar(concat string) Option {
}
}

// BlacklistRegex creates option for New constructor
func BlacklistRegex(regex string) Option {
return func(r interface{}) error {
if t, ok := r.(*Base); ok {
if regex != "" {
t.blacklistRegex = regexp.MustCompile(regex)
}
}
return nil
}
}

// New creates udp, tcp, pickle receiver
func New(dsn string, config tags.TagConfig, opts ...Option) (Receiver, error) {
u, err := url.Parse(dsn)
Expand Down
2 changes: 1 addition & 1 deletion receiver/tcp.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ func (rcv *TCP) Addr() net.Addr {
}

func (rcv *TCP) Stat(send func(metric string, value float64)) {
rcv.SendStat(send, "metricsReceived", "errors", "active", "futureDropped", "pastDropped", "tooLongDropped")
rcv.SendStat(send, "metricsReceived", "errors", "active", "futureDropped", "pastDropped", "tooLongDropped", "blacklistRegexDropped")
}

func (rcv *TCP) HandleConnection(conn net.Conn) {
Expand Down
2 changes: 1 addition & 1 deletion receiver/telegraf_http_json.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ func (rcv *TelegrafHttpJson) Addr() net.Addr {
}

func (rcv *TelegrafHttpJson) Stat(send func(metric string, value float64)) {
rcv.SendStat(send, "samplesReceived", "errors", "futureDropped", "pastDropped", "tooLongDropped")
rcv.SendStat(send, "samplesReceived", "errors", "futureDropped", "pastDropped", "tooLongDropped", "blacklistRegexDropped")
}

// Listen bind port. Receive messages and send to out channel
Expand Down
2 changes: 1 addition & 1 deletion receiver/udp.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ func (rcv *UDP) Addr() net.Addr {

func (rcv *UDP) Stat(send func(metric string, value float64)) {
rcv.SendStat(send, "metricsReceived", "errors", "incompleteReceived", "futureDropped", "pastDropped",
"tooLongDropped")
"tooLongDropped", "blacklistRegexDropped")
}

func (rcv *UDP) receiveWorker(ctx context.Context) {
Expand Down
Loading