diff --git a/carbon/app.go b/carbon/app.go index bf649e85..b811c57e 100644 --- a/carbon/app.go +++ b/carbon/app.go @@ -256,6 +256,7 @@ func (app *App) Start() (err error) { receiver.DropPast(uint32(conf.Tcp.DropPast.Value().Seconds())), receiver.DropLongerThan(conf.Tcp.DropLongerThan), receiver.ReadTimeout(uint32(conf.Tcp.ReadTimeout.Value().Seconds())), + receiver.BlacklistRegex(conf.Common.BlacklistRegex), ) if err != nil { @@ -274,6 +275,7 @@ func (app *App) Start() (err error) { receiver.DropFuture(uint32(conf.Udp.DropFuture.Value().Seconds())), receiver.DropPast(uint32(conf.Udp.DropPast.Value().Seconds())), receiver.DropLongerThan(conf.Udp.DropLongerThan), + receiver.BlacklistRegex(conf.Common.BlacklistRegex), ) if err != nil { @@ -292,6 +294,7 @@ func (app *App) Start() (err error) { receiver.DropFuture(uint32(conf.Pickle.DropFuture.Value().Seconds())), receiver.DropPast(uint32(conf.Pickle.DropPast.Value().Seconds())), receiver.DropLongerThan(conf.Pickle.DropLongerThan), + receiver.BlacklistRegex(conf.Common.BlacklistRegex), ) if err != nil { @@ -309,6 +312,7 @@ func (app *App) Start() (err error) { receiver.DropFuture(uint32(conf.Grpc.DropFuture.Value().Seconds())), receiver.DropPast(uint32(conf.Grpc.DropPast.Value().Seconds())), receiver.DropLongerThan(conf.Grpc.DropLongerThan), + receiver.BlacklistRegex(conf.Common.BlacklistRegex), ) if err != nil { @@ -326,6 +330,7 @@ func (app *App) Start() (err error) { receiver.DropFuture(uint32(conf.Prometheus.DropFuture.Value().Seconds())), receiver.DropPast(uint32(conf.Prometheus.DropPast.Value().Seconds())), receiver.DropLongerThan(conf.Prometheus.DropLongerThan), + receiver.BlacklistRegex(conf.Common.BlacklistRegex), ) if err != nil { @@ -344,6 +349,7 @@ func (app *App) Start() (err error) { receiver.DropPast(uint32(conf.TelegrafHttpJson.DropPast.Value().Seconds())), receiver.DropLongerThan(conf.TelegrafHttpJson.DropLongerThan), receiver.ConcatChar(conf.TelegrafHttpJson.Concat), + receiver.BlacklistRegex(conf.Common.BlacklistRegex), ) if err != nil { diff --git a/carbon/config.go b/carbon/config.go index 7b3a2fc9..20cf9bfe 100644 --- a/carbon/config.go +++ b/carbon/config.go @@ -4,6 +4,7 @@ import ( "bytes" "fmt" "io/ioutil" + "regexp" "strings" "time" @@ -25,6 +26,7 @@ type commonConfig struct { MetricEndpoint string `toml:"metric-endpoint"` MaxCPU int `toml:"max-cpu"` Enabled bool `toml:"enabled"` + BlacklistRegex string `toml:"blacklist-regex"` } type clickhouseConfig struct { @@ -279,6 +281,12 @@ func ReadConfig(filename string, exactConfig bool) (*Config, error) { } } + if cfg.Common.BlacklistRegex != "" { + if _, err := regexp.Compile(cfg.Common.BlacklistRegex); err != nil { + return nil, fmt.Errorf("invalid regex in blacklist-regex option: %s", err.Error()) + } + } + if cfg.Logging == nil { cfg.Logging = make([]zapwriter.Config, 0) } diff --git a/receiver/base.go b/receiver/base.go index 78319cfc..fa994d76 100644 --- a/receiver/base.go +++ b/receiver/base.go @@ -3,6 +3,7 @@ package receiver import ( "fmt" "net/http" + "regexp" "sort" "sync" "sync/atomic" @@ -18,15 +19,16 @@ const droppedListSize = 1000 type Base struct { stop.Struct stat struct { - samplesReceived uint64 // atomic - messagesReceived uint64 // atomic - metricsReceived uint64 // atomic - errors uint64 // atomic - active int64 // atomic - incompleteReceived uint64 // atomic - futureDropped uint64 // atomic - pastDropped uint64 // atomic - tooLongDropped uint64 // atomic + samplesReceived uint64 // atomic + messagesReceived uint64 // atomic + metricsReceived uint64 // atomic + errors uint64 // atomic + active int64 // atomic + incompleteReceived uint64 // atomic + futureDropped uint64 // atomic + pastDropped uint64 // atomic + tooLongDropped uint64 // atomic + blacklistRegexDropped uint64 // atomic } droppedList [droppedListSize]string droppedListNext int @@ -36,6 +38,7 @@ type Base struct { dropPastSeconds uint32 dropTooLongLimit uint16 readTimeoutSeconds uint32 + blacklistRegex *regexp.Regexp writeChan chan *RowBinary.WriteBuffer logger *zap.Logger Tags tags.TagConfig @@ -85,6 +88,14 @@ func (base *Base) isDropMetricNameTooLong(name string) bool { return false } +func (base *Base) isMatchedByBlacklistRegex(name []byte) bool { + if base.blacklistRegex != nil && base.blacklistRegex.Match(name) { + atomic.AddUint64(&base.stat.blacklistRegexDropped, 1) + return true + } + return false +} + func (base *Base) DroppedHandler(w http.ResponseWriter, r *http.Request) { w.Header().Set("Content-Type", "text/plain") @@ -143,6 +154,8 @@ func (base *Base) SendStat(send func(metric string, value float64), fields ...st sendUint64Counter(send, f, &base.stat.pastDropped) case "tooLongDropped": sendUint64Counter(send, f, &base.stat.tooLongDropped) + case "blacklistRegexDropped": + sendUint64Counter(send, f, &base.stat.blacklistRegexDropped) case "errors": sendUint64Counter(send, f, &base.stat.errors) case "active": diff --git a/receiver/grpc.go b/receiver/grpc.go index 6dad436f..aa6363c5 100644 --- a/receiver/grpc.go +++ b/receiver/grpc.go @@ -32,7 +32,7 @@ func (g *GRPC) Addr() net.Addr { } func (g *GRPC) Stat(send func(metric string, value float64)) { - g.SendStat(send, "metricsReceived", "errors", "futureDropped", "pastDropped", "tooLongDropped") + g.SendStat(send, "metricsReceived", "errors", "futureDropped", "pastDropped", "tooLongDropped", "blacklistRegexDropped") } // Listen bind port. Receive messages and send to out channel diff --git a/receiver/pickle.go b/receiver/pickle.go index 2611f685..05ed9b91 100644 --- a/receiver/pickle.go +++ b/receiver/pickle.go @@ -33,7 +33,7 @@ func (rcv *Pickle) Addr() net.Addr { func (rcv *Pickle) Stat(send func(metric string, value float64)) { rcv.SendStat(send, "metricsReceived", "messagesReceived", "errors", "active", "futureDropped", "pastDropped", - "tooLongDropped") + "tooLongDropped", "blacklistRegexDropped") } func (rcv *Pickle) HandleConnection(conn net.Conn) { diff --git a/receiver/plain.go b/receiver/plain.go index fa026658..809259ce 100644 --- a/receiver/plain.go +++ b/receiver/plain.go @@ -70,6 +70,10 @@ func (base *Base) PlainParseLine(p []byte, now uint32, buf *tags.GraphiteBuf) ([ i3-- } + if base.isMatchedByBlacklistRegex(p[:i1]) { + return nil, 0, 0, errors.New("metric name matched by blacklist regex: '" + unsafeString(p) + "'") + } + value, err := strconv.ParseFloat(unsafeString(p[i1+1:i2]), 64) if err != nil || math.IsNaN(value) { return nil, 0, 0, errors.New("bad message: '" + unsafeString(p) + "'") diff --git a/receiver/plain_test.go b/receiver/plain_test.go index 5a0d994e..074f527e 100644 --- a/receiver/plain_test.go +++ b/receiver/plain_test.go @@ -3,6 +3,7 @@ package receiver import ( "context" "fmt" + "regexp" "sync" "testing" "time" @@ -177,6 +178,8 @@ func TestPlainParseLine(t *testing.T) { {"metric.name;tag=value;k=v 42.15 1422642189\r\n", "metric.name?k=v&tag=value", 42.15, 1422642189}, {"metric..name 42.15 -1\n", "metric.name", 42.15, now}, {"cpu.loadavg;env=test2;host=host1;env=test 21.4 1422642189\n", "cpu.loadavg?env=test&host=host1", 21.4, 1422642189}, + {"cpu.loadavg~ 21.4 1422642189\n", "cpu.loadavg~", 21.4, 1422642189}, + {"cpu.loadavg~;env=test2;host=host1;env=test 21.4 1422642189\n", "cpu.loadavg~?env=test&host=host1", 21.4, 1422642189}, } base := &Base{} @@ -202,4 +205,128 @@ func TestPlainParseLine(t *testing.T) { } } } + + tableWithValidation := [](struct { + b string + name string + value float64 + timestamp uint32 + }){ + {b: "42"}, + {b: ""}, + {b: "\n"}, + {b: "metric..name 42 \n"}, + {b: "metric..name 42"}, + {b: "metric.name 42 a1422642189\n"}, + {b: "metric.name 42a 1422642189\n"}, + {b: "metric.name NaN 1422642189\n"}, + {b: "metric.name 42 NaN\n"}, + {"metric.name -42.76 1422642189\n", "metric.name", -42.76, 1422642189}, + {"metric.name 42.15 1422642189\n", "metric.name", 42.15, 1422642189}, + {"metric..name 42.15 1422642189\n", "metric.name", 42.15, 1422642189}, + {"metric...name 42.15 1422642189\n", "metric.name", 42.15, 1422642189}, + {"metric.name 42.15 1422642189\r\n", "metric.name", 42.15, 1422642189}, + {"metric.name;tag=value;k=v 42.15 1422642189\r\n", "metric.name?k=v&tag=value", 42.15, 1422642189}, + {"metric..name 42.15 -1\n", "metric.name", 42.15, now}, + {"cpu.loadavg;env=test2;host=host1;env=test 21.4 1422642189\n", "cpu.loadavg?env=test&host=host1", 21.4, 1422642189}, + + // Additional test cases for validation + // Test invalid characters in metric names + {b: "metric@name 42.15 1422642189\n"}, + {b: "metric#name 42.15 1422642189\n"}, + {b: "metric$name 42.15 1422642189\n"}, + {b: "metric%name 42.15 1422642189\n"}, + {b: "metric&name 42.15 1422642189\n"}, + {b: "metric*name 42.15 1422642189\n"}, + {b: "metric!name 42.15 1422642189\n"}, + {b: "metric name 42.15 1422642189\n"}, // space in metric name + {b: "metric\tname 42.15 1422642189\n"}, // tab in metric name + {b: "metric[name] 42.15 1422642189\n"}, + {b: "metric{name} 42.15 1422642189\n"}, + {b: "metric(name) 42.15 1422642189\n"}, + {b: "metric/name 42.15 1422642189\n"}, + {b: "metric\\name 42.15 1422642189\n"}, + {b: "metric|name 42.15 1422642189\n"}, + {b: "metric?name 42.15 1422642189\n"}, + {b: "metric 42.15 1422642189\n"}, + {b: "metric'name' 42.15 1422642189\n"}, + {b: "metric\"name\" 42.15 1422642189\n"}, + + // Test valid characters that should pass + {"metric-name 42.15 1422642189\n", "metric-name", 42.15, 1422642189}, + {"metric_name 42.15 1422642189\n", "metric_name", 42.15, 1422642189}, + {"metric:name 42.15 1422642189\n", "metric:name", 42.15, 1422642189}, + {"metric.sub.name 42.15 1422642189\n", "metric.sub.name", 42.15, 1422642189}, + {"metric-123_test:data 42.15 1422642189\n", "metric-123_test:data", 42.15, 1422642189}, + + // Test invalid characters in tags + {b: "metric.name;tag@=value 42.15 1422642189\n"}, + {b: "metric.name;tag=val@ue 42.15 1422642189\n"}, + {b: "metric.name;t ag=value 42.15 1422642189\n"}, + {b: "metric.name;tag=val ue 42.15 1422642189\n"}, + {b: "metric.name;tag#key=value 42.15 1422642189\n"}, + {b: "metric.name;tag=value! 42.15 1422642189\n"}, + {b: "metric.name;tag=value;key=val*ue 42.15 1422642189\n"}, + {b: "metric.name;tag=value;k ey=value 42.15 1422642189\n"}, + {b: "metric.name;tag=value;key=val\tue 42.15 1422642189\n"}, + {b: "metric.name;tag=value;key=val\nue 42.15 1422642189\n"}, + + // Test valid tags that should pass + {"metric.name;env=prod 42.15 1422642189\n", "metric.name?env=prod", 42.15, 1422642189}, + {"metric.name;env=prod;region=us-east-1 42.15 1422642189\n", "metric.name?env=prod®ion=us-east-1", 42.15, 1422642189}, + {"metric.name;tag-name=tag-value 42.15 1422642189\n", "metric.name?tag-name=tag-value", 42.15, 1422642189}, + {"metric.name;tag_name=tag_value 42.15 1422642189\n", "metric.name?tag_name=tag_value", 42.15, 1422642189}, + {"metric.name;tag:name=tag:value 42.15 1422642189\n", "metric.name?tag%3Aname=tag%3Avalue", 42.15, 1422642189}, + {"metric.name;tag.name=tag.value 42.15 1422642189\n", "metric.name?tag.name=tag.value", 42.15, 1422642189}, + + // Test edge cases with multiple invalid characters + {b: "metric@#$%name 42.15 1422642189\n"}, + {b: "metric.name;tag@#=value$% 42.15 1422642189\n"}, + {b: "met!ric.na@me;ta#g=val$ue 42.15 1422642189\n"}, + + // Test unicode characters (should fail validation) + {b: "metric.名前 42.15 1422642189\n"}, + {b: "metric.name;tag=値 42.15 1422642189\n"}, + {b: "metric.name;标签=value 42.15 1422642189\n"}, + {b: "метрика.name 42.15 1422642189\n"}, + + // Test empty tag keys/values + {b: "metric.name;=value 42.15 1422642189\n"}, + {b: "metric.name;= 42.15 1422642189\n"}, + + // Test metrics with numbers + {"metric123 42.15 1422642189\n", "metric123", 42.15, 1422642189}, + {"123metric 42.15 1422642189\n", "123metric", 42.15, 1422642189}, + {"123 42.15 1422642189\n", "123", 42.15, 1422642189}, + + // Test metrics with only valid special characters + {"metric-_.:name 42.15 1422642189\n", "metric-_.:name", 42.15, 1422642189}, + {"metric.name;tag-_.:key=tag-_.:value 42.15 1422642189\n", "metric.name?tag-_.%3Akey=tag-_.%3Avalue", 42.15, 1422642189}, + + // Additional tests for colon encoding + {"host:port:metric 42.15 1422642189\n", "host:port:metric", 42.15, 1422642189}, + {"metric.name;service:port=web:8080 42.15 1422642189\n", "metric.name?service%3Aport=web%3A8080", 42.15, 1422642189}, + {"app:service:metric;env=prod:primary 42.15 1422642189\n", "app:service:metric?env=prod%3Aprimary", 42.15, 1422642189}, + } + + baseWithValidation := &Base{blacklistRegex: regexp.MustCompile(`[^a-zA-Z0-9.;\-_:=]{1}`)} + for _, p := range tableWithValidation { + name, value, timestamp, err := baseWithValidation.PlainParseLine([]byte(p.b), now, &tagBuf) + if p.name == "" { + // expected error + if err == nil { + t.Fatal("error expected") + } + } else { + if string(name) != p.name { + t.Fatalf("%#v != %#v", string(name), p.name) + } + if value != p.value { + t.Fatalf("%#v != %#v", value, p.value) + } + if timestamp != p.timestamp { + t.Fatalf("%d != %d", timestamp, p.timestamp) + } + } + } } diff --git a/receiver/prometheus.go b/receiver/prometheus.go index 5f2621de..8f88ee35 100644 --- a/receiver/prometheus.go +++ b/receiver/prometheus.go @@ -199,7 +199,7 @@ func (rcv *PrometheusRemoteWrite) Addr() net.Addr { } func (rcv *PrometheusRemoteWrite) Stat(send func(metric string, value float64)) { - rcv.SendStat(send, "samplesReceived", "errors", "futureDropped", "pastDropped", "tooLongDropped") + rcv.SendStat(send, "samplesReceived", "errors", "futureDropped", "pastDropped", "tooLongDropped", "blacklistRegexDropped") } // Listen bind port. Receive messages and send to out channel diff --git a/receiver/receiver.go b/receiver/receiver.go index 2ee6c56b..06017ad7 100644 --- a/receiver/receiver.go +++ b/receiver/receiver.go @@ -5,6 +5,7 @@ import ( "net" "net/http" "net/url" + "regexp" "strings" "github.com/lomik/carbon-clickhouse/helper/RowBinary" @@ -90,6 +91,18 @@ func ConcatChar(concat string) Option { } } +// BlacklistRegex creates option for New constructor +func BlacklistRegex(regex string) Option { + return func(r interface{}) error { + if t, ok := r.(*Base); ok { + if regex != "" { + t.blacklistRegex = regexp.MustCompile(regex) + } + } + return nil + } +} + // New creates udp, tcp, pickle receiver func New(dsn string, config tags.TagConfig, opts ...Option) (Receiver, error) { u, err := url.Parse(dsn) diff --git a/receiver/tcp.go b/receiver/tcp.go index 81ce4581..a990cb6c 100644 --- a/receiver/tcp.go +++ b/receiver/tcp.go @@ -28,7 +28,7 @@ func (rcv *TCP) Addr() net.Addr { } func (rcv *TCP) Stat(send func(metric string, value float64)) { - rcv.SendStat(send, "metricsReceived", "errors", "active", "futureDropped", "pastDropped", "tooLongDropped") + rcv.SendStat(send, "metricsReceived", "errors", "active", "futureDropped", "pastDropped", "tooLongDropped", "blacklistRegexDropped") } func (rcv *TCP) HandleConnection(conn net.Conn) { diff --git a/receiver/telegraf_http_json.go b/receiver/telegraf_http_json.go index 58921ba9..d56f291b 100644 --- a/receiver/telegraf_http_json.go +++ b/receiver/telegraf_http_json.go @@ -158,7 +158,7 @@ func (rcv *TelegrafHttpJson) Addr() net.Addr { } func (rcv *TelegrafHttpJson) Stat(send func(metric string, value float64)) { - rcv.SendStat(send, "samplesReceived", "errors", "futureDropped", "pastDropped", "tooLongDropped") + rcv.SendStat(send, "samplesReceived", "errors", "futureDropped", "pastDropped", "tooLongDropped", "blacklistRegexDropped") } // Listen bind port. Receive messages and send to out channel diff --git a/receiver/udp.go b/receiver/udp.go index 5195a804..0f0b2c95 100644 --- a/receiver/udp.go +++ b/receiver/udp.go @@ -28,7 +28,7 @@ func (rcv *UDP) Addr() net.Addr { func (rcv *UDP) Stat(send func(metric string, value float64)) { rcv.SendStat(send, "metricsReceived", "errors", "incompleteReceived", "futureDropped", "pastDropped", - "tooLongDropped") + "tooLongDropped", "blacklistRegexDropped") } func (rcv *UDP) receiveWorker(ctx context.Context) {