diff --git a/net2/FlowAggrTool.js b/net2/FlowAggrTool.js index fb4ffbeb34..d4b139aa6a 100644 --- a/net2/FlowAggrTool.js +++ b/net2/FlowAggrTool.js @@ -130,6 +130,7 @@ class FlowAggrTool { for(let destIP in traffics) { let traffic = (traffics[destIP] && traffics[destIP][trafficDirection]) || 0; + let port = (traffics[destIP] && traffics[destIP].port) || []; if(traffic < MIN_AGGR_TRAFFIC) { continue // skip very small traffic @@ -138,13 +139,13 @@ class FlowAggrTool { args.push(traffic) args.push(JSON.stringify({ device: mac, - destIP: destIP + destIP: destIP, + port: port })) } args.push(0); args.push("_"); // placeholder to keep key exists - await rclient.zaddAsync(args) await rclient.expireAsync(key, expire) await this.trimFlow(mac, trafficDirection, interval, ts) @@ -314,7 +315,7 @@ class FlowAggrTool { if(payload !== '_' && count !== 0) { try { let json = JSON.parse(payload); - results.push({ip: json.destIP, device: json.device, count: count}); + results.push({ip: json.destIP, device: json.device, count: count,port:json.port}); } catch(err) { log.error("Failed to parse payload: ", payload); } diff --git a/net2/FlowTool.js b/net2/FlowTool.js index dbd1e2a661..543fcaf299 100644 --- a/net2/FlowTool.js +++ b/net2/FlowTool.js @@ -643,6 +643,19 @@ class FlowTool { return flow.rb; } } + getTrafficPort(flow) { + let port; + if(flow.fd == "out"){ + port = flow.sp + }else{ + port = flow.dp + } + if(Array.isArray(port)){ + return port + }else{ + return [port] + } + } } module.exports = function() { diff --git a/net2/NetBotTool.js b/net2/NetBotTool.js index 87dbc4abd4..e3457bf207 100644 --- a/net2/NetBotTool.js +++ b/net2/NetBotTool.js @@ -281,7 +281,6 @@ class NetBotTool { json.flows[trafficDirection] = enriched.sort((a, b) => { return b.count - a.count; }); - return traffic } diff --git a/sensor/FlowAggregationSensor.js b/sensor/FlowAggregationSensor.js index aab7c704f6..03613b6c09 100644 --- a/sensor/FlowAggregationSensor.js +++ b/sensor/FlowAggregationSensor.js @@ -163,13 +163,19 @@ class FlowAggregationSensor extends Sensor { let t = traffic[destIP]; if(typeof t === 'undefined') { - traffic[destIP] = {upload: 0, download: 0}; + traffic[destIP] = {upload: 0, download: 0, port:[]}; t = traffic[destIP]; } t.upload += flowTool.getUploadTraffic(flow); t.download += flowTool.getDownloadTraffic(flow); - + for(let port of flowTool.getTrafficPort(flow)){ + port = ""+port;//make sure it is string + if(t.port.indexOf(port)==-1){ + t.port.push(port) + } + } + t.port.sort((a,b)=>{return a-b}) }); return traffic; @@ -452,7 +458,6 @@ class FlowAggregationSensor extends Sensor { flows.push.apply(flows, incomingFlows); let traffic = this.trafficGroupByDestIP(flows); - await flowAggrTool.addFlows(macAddress, "upload", this.config.interval, end, traffic, this.config.aggrFlowExpireTime); await flowAggrTool.addFlows(macAddress, "download", this.config.interval, end, traffic, this.config.aggrFlowExpireTime); }