Editing
Infrastructure Machines
(section)
Jump to navigation
Jump to search
Warning:
You are not logged in. Your IP address will be publicly visible if you make any edits. If you
log in
or
create an account
, your edits will be attributed to your username, along with other benefits.
Anti-spam check. Do
not
fill this in!
==== flow processing: castle ==== <pre> cat > /usr/home/flowbin/processflows.pl #!/usr/bin/perl #use strict; #$debug=1; #$dry=1; my $log = '/usr/home/flowbin/discards.log'; use Data::Dumper; BEGIN { push @INC, "/usr/home/flowbin"; } use db; #my $queuedir = "/usr/home/queue"; my $queuedir = "/usr/home/working"; my $archivedir = "/usr/home/archive"; unless ($dry) { if (-e "$queuedir/.lock") { open(FILE, "$queuedir/.lock"); my $pid = <FILE>; chomp($pid); close(FILE); if (kill(0, $pid)) { #another process is using the queue, bail out exit(0); } else { #dead lock file, remove it `rm $queuedir/.lock`; } } open(FILE, "> $queuedir/.lock"); print FILE "$$\n"; close(FILE); } my $db = db->new(); $db->connect('traffic', '', 'root', '5over3') || die $db->{'error'}; opendir(DIR, $queuedir); my @files = readdir(DIR); closedir(DIR); foreach my $file (sort @files) { unless($file =~ /^\./) { $file =~ /([0-9]{4}-[0-9]{2}-[0-9]{2})\.([0-9]{2})([0-9]{2})([0-9]{2})/; my $date = "$1 $2:$3:$4"; my $condensedDate = $1; $condensedDate =~ s/-//g; my $iptotal = {}; my $protototal = {}; my $porttotal = {}; &debug("started file $file at "); &debug(`date`); &debug("getting raw flow data (flow-print)"); `cat $queuedir/$file | /usr/local/bin/flow-print -f 5 > /usr/home/working/tmp-$file`; &debug("aggregating data at "); &debug(`date`); unless (open(DATA, "/usr/home/working/tmp-$file")) { die "can't open: $!"; } LOOP: while (my $line = readline DATA) { my @d = split /[\s]+/, $line; if ($d[0] ne '' && $d[0] ne 'Start') { my $addr = 0; my $port = 0; #Start End Sif SrcIPaddress SrcP DIf DstIPaddress DstP P Fl Pkts Octets #0 1 2 3 4 5 6 7 8 9 10 11 #| # outbound = 2, inbound = 1 my (@src_ip) = split '\.', $d[3]; my (@dst_ip) = split '\.', $d[6]; if ($src_ip[0] == 69 && $src_ip[1] == 55 && $src_ip[2] >= 224 && $src_ip[2] <= 239 && $src_ip[2] != 229 && $src_ip[2] != 231) { # for castle # if ($src_ip[0] == 69 && $src_ip[1] == 55 && $src_ip[2] == 229) { # for i2b $d[2] = 2; # hack for outbound bulk traffic counted 2x if ($dst_ip[2] == 234) { $d[11] /= 2; $d[10] /= 2; } } elsif ($dst_ip[0] == 69 && $dst_ip[1] == 55 && $dst_ip[2] >= 224 && $dst_ip[2] <= 239 && $dst_ip[2] != 229 && $dst_ip[2] != 231) { # for castle # elsif ($dst_ip[0] == 69 && $dst_ip[1] == 55 && $dst_ip[2] == 229) { # for i2b $d[2] = 1; } else { next LOOP; } if ($d[2] == 2) { $addr = $d[3]; # if the dst-port is low, store that if ($d[7] <= 1024) { $port = $d[7]; } # if the src-port is low, store that elsif ($d[4] <= 1024) { $port = $d[4]; } else { $port = 99999; } } elsif ($d[2] == 1) { $addr = $d[6]; # if the dst-port is high, assume its return traffic, try to store src-port if low if ($d[7] > 1024) { if ($d[4] <= 1024) { $port = $d[4]; } else { $port = 99999; } } else { $port = $d[7]; } } else { next LOOP; } my (@ip) = split '\.', $addr; unless ($ip[0] == 69) { next LOOP; } unless ($ip[1] == 55) { next LOOP; } unless ($ip[2] >= 224 && $ip[2] <= 239 && $ip[2] != 229 && $ip[2] != 231) { next LOOP; } # unless ($ip[2] == 229) { next LOOP; } my $classC = "$ip[0]_$ip[1]_$ip[2]"; # IP dir # if ($d[10] < 2147483647) { $iptotal->{$classC}->{$addr}->{$d[2]}->{'pktTotal'} += $d[10]; } # if ($d[11] < 2147483647) { $iptotal->{$classC}->{$addr}->{$d[2]}->{'octetTotal'} += $d[11]; } # # if ($d[10] < 2147483647) { $protototal->{$classC}->{$addr}->{$d[2]}->{$d[8]}->{'pktTotal'} += $d[10]; } # if ($d[11] < 2147483647) { $protototal->{$classC}->{$addr}->{$d[2]}->{$d[8]}->{'octetTotal'} += $d[11]; } # # if ($d[10] < 2147483647) { $porttotal->{$classC}->{$addr}->{$d[2]}->{$port}->{'pktTotal'} += $d[10]; } # if ($d[11] < 2147483647) { $porttotal->{$classC}->{$addr}->{$d[2]}->{$port}->{'octetTotal'} += $d[11]; } $iptotal->{$classC}->{$addr}->{$d[2]}->{'pktTotal'} += $d[10]; $iptotal->{$classC}->{$addr}->{$d[2]}->{'octetTotal'} += $d[11]; $protototal->{$classC}->{$addr}->{$d[2]}->{$d[8]}->{'pktTotal'} += $d[10]; $protototal->{$classC}->{$addr}->{$d[2]}->{$d[8]}->{'octetTotal'} += $d[11]; $porttotal->{$classC}->{$addr}->{$d[2]}->{$d[8]}->{$port}->{'pktTotal'} += $d[10]; $porttotal->{$classC}->{$addr}->{$d[2]}->{$d[8]}->{$port}->{'octetTotal'} += $d[11]; } } close(DATA); `rm /usr/home/working/tmp-$file`; &debug("processing ip totals at "); &debug(`date`); foreach my $classC (keys(%{$iptotal})) { $db->query("lock tables dailyIpTotals_$classC write") unless $dry; my @values; foreach my $ip (keys(%{$iptotal->{$classC}})) { foreach my $dir (keys(%{$iptotal->{$classC}->{$ip}})) { my $octets = $iptotal->{$classC}->{$ip}->{$dir}->{'octetTotal'}; my $packets = $iptotal->{$classC}->{$ip}->{$dir}->{'pktTotal'}; # $packets = $packets > 2147483647 ? 0 : $packets; if ($octets > 2147483647) { my $ddir = $dir==1 ? 'in' : 'out'; `echo "$date $ip $ddir $octets\n" >> $log`; # $octets = 0; } # dailyIpTotals.id = ip(no .'s)-yyyymmdd-direction my $id = "$ip-$condensedDate-$dir"; $id =~ s/\.//g; push @values, "('$date', '$ip', $dir, $octets, $packets)"; my $sql = "insert into dailyIpTotals_$classC values ('$id', '$date', '$ip', $dir, $octets, $packets) ON DUPLICATE KEY UPDATE octets=octets+$octets, packets=packets+$packets"; print "$sql\n" if $dry; $db->query($sql) unless $dry; # $db->query("insert into ipTotals values ('$date', '$ip', $dir, $octets, $packets)"); } } $db->query("unlock tables") unless $dry; $db->query("lock tables ipTotals_$classC write") unless $dry; # break inserts into 100 records at a time &debug("inserting $#values +1 values"); while ($#values > 0) { my $sql = "insert into ipTotals_$classC values "; my $max_index = $#values > 100 ? 100 : $#values; for (my $i=0; $i<=$max_index; $i++) { $sql .= shift @values; $sql .= ','; } chop $sql; print "$sql\n" if $dry; $db->query($sql) unless $dry; } $db->query("unlock tables") unless $dry; } sleep 20; # &debug("processing protocol totals at "); # &debug(`date`); # foreach my $classC (keys(%{$protototal})) { # $db->query("lock tables dailyProtoTotals_$classC write") unless $dry; # my @values; # foreach my $ip (keys(%{$protototal->{$classC}})) { # foreach my $dir (keys(%{$protototal->{$classC}->{$ip}})) { # foreach my $proto (keys(%{$protototal->{$classC}->{$ip}->{$dir}})) { # my $octets = $protototal->{$classC}->{$ip}->{$dir}->{$proto}->{'octetTotal'}; # my $packets = $protototal->{$classC}->{$ip}->{$dir}->{$proto}->{'pktTotal'}; # # $octets = $octets > 2147483647 ? 0 : $octets; # # $packets = $packets > 2147483647 ? 0 : $packets; # # dailyProtoTotals.id = ip(no .'s)-yyyymmdd-direction-proto # my $id = "$ip-$condensedDate-$dir-$proto"; # $id =~ s/\.//g; # push @values, "('$date', '$ip', $dir, $proto, $octets, $packets)"; # my $sql = "insert into dailyProtoTotals_$classC values ('$id', '$date', '$ip', $dir, $proto, $octets, $packets) ON DUPLICATE KEY UPDATE octets=octets+$octets, packets=packets+$packets"; # print "$sql\n" if $dry; # $db->query($sql) unless $dry; # # $db->query("insert into protoTotals values ('$date', '$ip', $dir, $proto, $octets, $packets)"); # } # } # } # $db->query("unlock tables") unless $dry; # my $sql = "insert into protoTotals_$classC values "; # $sql .= join ',', @values; # $db->query("lock tables protoTotals_$classC write") unless $dry; # print "$sql\n" if $dry; # $db->query($sql) unless $dry; # $db->query("unlock tables") unless $dry; # } &debug("processing port totals at "); &debug(`date`); foreach my $classC (keys(%{$porttotal})) { $db->query("lock tables dailyPortTotals_$classC write") unless $dry; my @values; foreach my $ip (keys(%{$porttotal->{$classC}})) { foreach my $dir (keys(%{$porttotal->{$classC}->{$ip}})) { foreach my $proto (keys(%{$porttotal->{$classC}->{$ip}->{$dir}})) { foreach my $port (keys(%{$porttotal->{$classC}->{$ip}->{$dir}->{$proto}})) { my $octets = $porttotal->{$classC}->{$ip}->{$dir}->{$proto}->{$port}->{'octetTotal'}; my $packets = $porttotal->{$classC}->{$ip}->{$dir}->{$proto}->{$port}->{'pktTotal'}; # $octets = $octets > 2147483647 ? 0 : $octets; # $packets = $packets > 2147483647 ? 0 : $packets; # dailyPortTotals.id = ip(no .'s)-yyyymmdd-direction-protocol-port my $id = "$ip-$condensedDate-$dir-$proto-$port"; $id =~ s/\.//g; push @values, "('$date', '$ip', $dir, $proto, $port, $octets, $packets)"; my $sql = "insert into dailyPortTotals_$classC values ('$id', '$date', '$ip', $dir, $proto, $port, $octets, $packets) ON DUPLICATE KEY UPDATE octets=octets+$octets, packets=packets+$packets"; print "$sql\n" if $dry; $db->query($sql) unless $dry; # $db->query("insert into portTotals values ('$date', '$ip', $dir, $port, $octets, $packets)"); } } } } $db->query("unlock tables") unless $dry; $db->query("lock tables portTotals_$classC write") unless $dry; # break inserts into 100 records at a time &debug("inserting $#values +1 values"); while ($#values > 0) { my $sql = "insert into portTotals_$classC values "; my $max_index = $#values > 100 ? 100 : $#values; for (my $i=0; $i<=$max_index; $i++) { $sql .= shift @values; $sql .= ','; } chop $sql; print "$sql\n" if $dry; $db->query($sql) unless $dry; } $db->query("unlock tables") unless $dry; sleep 10; } # 12 1 8 1 1= 23 # dailyIpTotals.id = ip(no .'s)-yyyymmdd-direction # 12 1 8 1 1 3=26 # dailyProtoTotals.id = ip(no .'s)-yyyymmdd-direction-proto # 12 1 8 1 1 5=28 # dailyPortTotals.id = ip(no .'s)-yyyymmdd-direction-port #print "finished at "; #print `date`; `mv $queuedir/$file $archivedir` unless $dry; } } `rm $queuedir/.lock` unless $dry; sub debug { my $message = shift; if ($debug) { print "$message\n"; } } # var full during ft-v05.2005-03-28.084500-0800 and # 2005-02-24 69.55.226 # all port/daily totals before 2005-04-07 </pre> <pre> cat > /usr/home/flowbin/db.pm #!/usr/bin/perl # # $Header: /usr/cvs/newgw/lib/db.pm,v 1.4 2003/06/05 18:20:01 glenn Exp $ # # Copyright (c) 2003 # e-Monitoring Networks, Inc. All rights reserved. # # package db; use strict; use DBI; sub new { my $class = shift; my $self = {}; $self->{'debug'} = 0; bless $self, $class; return $self; } sub connect { my $self = shift; my $dbname = shift; my $dbhost = shift; my $dbuser = shift; my $dbpass = shift; my $host = ''; if (defined($dbhost)) { $host = ";host=$dbhost"; } eval { $self->debug("connecting to: DBI:mysql:database=$dbname;$host", 1); $self->{'dbh'} = DBI->connect("DBI:mysql:database=$dbname;$host", $dbuser, $dbpass); }; if ($self->{'dbh'}) { return 1; } $self->{'error'} = "Error connecting to database $@"; $self->debug("Error connecting to database $@"); return 0; } sub query { my $self = shift; my $query = shift; $self->debug($query, 1); my $sth; eval { $sth = $self->{'dbh'}->prepare($query); }; unless ($sth) { $self->{'error'} = "error preparing query $@"; $self->debug("error preparing query $@"); return undef; } my $qty; eval { $qty = $sth->execute; }; unless ($qty) { $self->{'error'} = "error executing query $@"; warn "error executing query $@ $query"; return undef; } $self->debug("returning $qty, $sth from query", 6); return ($qty, $sth); } sub disconnect { my $self = shift; $self->{'dbh'}->disconnect; return 0; } sub debug { my $self = shift; my $msg = shift; my $level = shift || 0; if ($level < $self->{'debug'}) { print "$msg\n"; } return 0; } 1; </pre> mkdir /usr/home/archive mkdir -p /usr/home/sql/tmp <pre>crontab -e #process flows 2,17,32,47 * * * * /usr/home/flowbin/processflows.pl</pre>
Summary:
Please note that all contributions to JCWiki may be edited, altered, or removed by other contributors. If you do not want your writing to be edited mercilessly, then do not submit it here.
You are also promising us that you wrote this yourself, or copied it from a public domain or similar free resource (see
JCWiki:Copyrights
for details).
Do not submit copyrighted work without permission!
Cancel
Editing help
(opens in new window)
Navigation menu
Personal tools
Not logged in
Talk
Contributions
Create account
Log in
Namespaces
Page
Discussion
English
Views
Read
Edit
View history
More
Search
Navigation
Main page
Recent changes
Random page
Help about MediaWiki
Tools
What links here
Related changes
Special pages
Page information