Editing
Infrastructure Machines
(section)
Jump to navigation
Jump to search
Warning:
You are not logged in. Your IP address will be publicly visible if you make any edits. If you
log in
or
create an account
, your edits will be attributed to your username, along with other benefits.
Anti-spam check. Do
not
fill this in!
==== flow processing: i2b ==== <pre>cat > /usr/home/flowbin/processflows-sql.pl #!/usr/bin/perl #use strict; #$debug=1; #$dry=1; my $log = '/usr/home/flowbin/discards.log'; use Data::Dumper; BEGIN { push @INC, "/usr/home/flowbin"; } #my $queuedir = "/usr/home/queue"; my $queuedir = "/usr/home/working"; my $archivedir = "/usr/home/archive"; my $sqldir = "/usr/home/sql"; my $sqldirworking = "/usr/home/sql/tmp"; unless ($dry) { if (-e "$queuedir/.lock") { open(FILE, "$queuedir/.lock"); my $pid = <FILE>; chomp($pid); close(FILE); if (kill(0, $pid)) { #another process is using the queue, bail out exit(0); } else { #dead lock file, remove it `rm $queuedir/.lock`; } } open(FILE, "> $queuedir/.lock"); print FILE "$$\n"; close(FILE); } opendir(DIR, $queuedir); my @files = readdir(DIR); closedir(DIR); foreach my $file (sort @files) { unless($file =~ /^\./) { $file =~ /([0-9]{4}-[0-9]{2}-[0-9]{2})\.([0-9]{2})([0-9]{2})([0-9]{2})/; my $date = "$1 $2:$3:$4"; my $outfile = "$1-$2:$3.sql"; unless (open (SQL, "+> $sqldirworking/$outfile")) { die "cant open $sqldirworking/$outfile"; } my $condensedDate = $1; $condensedDate =~ s/-//g; my $iptotal = {}; my $protototal = {}; my $porttotal = {}; &debug("started file $file at "); &debug(`date`); &debug("getting raw flow data (flow-print)"); `cat $queuedir/$file | /usr/local/bin/flow-print -f 5 > /usr/home/working/tmp-$file`; &debug("aggregating data at "); &debug(`date`); unless (open(DATA, "/usr/home/working/tmp-$file")) { die "can't open: $!"; } LOOP: while (my $line = readline DATA) { my @d = split /[\s]+/, $line; if ($d[0] ne '' && $d[0] ne 'Start') { my $addr = 0; my $port = 0; #Start End Sif SrcIPaddress SrcP DIf DstIPaddress DstP P Fl Pkts Octets #0 1 2 3 4 5 6 7 8 9 10 11 #| # outbound = 2, inbound = 1 my (@src_ip) = split '\.', $d[3]; my (@dst_ip) = split '\.', $d[6]; if ($src_ip[0] == 69 && $src_ip[1] == 55 && ($src_ip[2] == 229 || $src_ip[2] == 231)) { # for i2b $d[2] = 2; # hack for outbound bulk traffic counted 2x #if ($src_ip[2] == 231) { $d[11] /= 2; $d[10] /= 2; } } # note- this is where we filter out IPs only found at i2b elsif ($dst_ip[0] == 69 && $dst_ip[1] == 55 && ($dst_ip[2] == 229 || $dst_ip[2] == 231)) { # for i2b $d[2] = 1; } else { next LOOP; } if ($d[2] == 2) { $addr = $d[3]; # if the dst-port is low, store that if ($d[7] <= 1024) { $port = $d[7]; } # if the src-port is low, store that elsif ($d[4] <= 1024) { $port = $d[4]; } else { $port = 99999; } } elsif ($d[2] == 1) { $addr = $d[6]; # if the dst-port is high, assume its return traffic, try to store src-port if low if ($d[7] > 1024) { if ($d[4] <= 1024) { $port = $d[4]; } else { $port = 99999; } } else { $port = $d[7]; } } else { next LOOP; } my (@ip) = split '\.', $addr; unless ($ip[0] == 69) { next LOOP; } unless ($ip[1] == 55) { next LOOP; } unless ($ip[2] == 229 || $ip[2] == 231) { next LOOP; } my $classC = "$ip[0]_$ip[1]_$ip[2]"; # IP dir # if ($d[10] < 2147483647) { $iptotal->{$classC}->{$addr}->{$d[2]}->{'pktTotal'} += $d[10]; } # if ($d[11] < 2147483647) { $iptotal->{$classC}->{$addr}->{$d[2]}->{'octetTotal'} += $d[11]; } # # if ($d[10] < 2147483647) { $protototal->{$classC}->{$addr}->{$d[2]}->{$d[8]}->{'pktTotal'} += $d[10]; } # if ($d[11] < 2147483647) { $protototal->{$classC}->{$addr}->{$d[2]}->{$d[8]}->{'octetTotal'} += $d[11]; } # # if ($d[10] < 2147483647) { $porttotal->{$classC}->{$addr}->{$d[2]}->{$port}->{'pktTotal'} += $d[10]; } # if ($d[11] < 2147483647) { $porttotal->{$classC}->{$addr}->{$d[2]}->{$port}->{'octetTotal'} += $d[11]; } $iptotal->{$classC}->{$addr}->{$d[2]}->{'pktTotal'} += $d[10]; $iptotal->{$classC}->{$addr}->{$d[2]}->{'octetTotal'} += $d[11]; $protototal->{$classC}->{$addr}->{$d[2]}->{$d[8]}->{'pktTotal'} += $d[10]; $protototal->{$classC}->{$addr}->{$d[2]}->{$d[8]}->{'octetTotal'} += $d[11]; $porttotal->{$classC}->{$addr}->{$d[2]}->{$d[8]}->{$port}->{'pktTotal'} += $d[10]; $porttotal->{$classC}->{$addr}->{$d[2]}->{$d[8]}->{$port}->{'octetTotal'} += $d[11]; } } close(DATA); `rm /usr/home/working/tmp-$file`; &debug("processing ip totals at "); &debug(`date`); foreach my $classC (keys(%{$iptotal})) { my @values; foreach my $ip (keys(%{$iptotal->{$classC}})) { foreach my $dir (keys(%{$iptotal->{$classC}->{$ip}})) { my $octets = $iptotal->{$classC}->{$ip}->{$dir}->{'octetTotal'}; my $packets = $iptotal->{$classC}->{$ip}->{$dir}->{'pktTotal'}; # $packets = $packets > 2147483647 ? 0 : $packets; if ($octets > 2147483647) { my $ddir = $dir==1 ? 'in' : 'out'; #print SQL "$date $ip $ddir $octets\n"; # $octets = 0; } # dailyIpTotals.id = ip(no .'s)-yyyymmdd-direction my $id = "$ip-$condensedDate-$dir"; $id =~ s/\.//g; push @values, "('$date', '$ip', $dir, $octets, $packets)"; my $sql = "insert into dailyIpTotals_$classC values ('$id', '$date', '$ip', $dir, $octets, $packets) ON DUPLICATE KEY UPDATE octets=octets+$octets, packets=packets+$packets"; print "$sql\n" if $dry; print SQL "$sql;\n"; # $db->query("insert into ipTotals values ('$date', '$ip', $dir, $octets, $packets)"); } } # break inserts into 100 records at a time &debug("inserting $#values +1 values"); while ($#values > 0) { my $sql = "insert into ipTotals_$classC values "; my $max_index = $#values > 100 ? 100 : $#values; for (my $i=0; $i<=$max_index; $i++) { $sql .= shift @values; $sql .= ','; } chop $sql; print "$sql\n" if $dry; print SQL "$sql;\n"; } } # &debug("processing protocol totals at "); # &debug(`date`); # foreach my $classC (keys(%{$protototal})) { # $db->query("lock tables dailyProtoTotals_$classC write") unless $dry; # my @values; # foreach my $ip (keys(%{$protototal->{$classC}})) { # foreach my $dir (keys(%{$protototal->{$classC}->{$ip}})) { # foreach my $proto (keys(%{$protototal->{$classC}->{$ip}->{$dir}})) { # my $octets = $protototal->{$classC}->{$ip}->{$dir}->{$proto}->{'octetTotal'}; # my $packets = $protototal->{$classC}->{$ip}->{$dir}->{$proto}->{'pktTotal'}; # # $octets = $octets > 2147483647 ? 0 : $octets; # # $packets = $packets > 2147483647 ? 0 : $packets; # # dailyProtoTotals.id = ip(no .'s)-yyyymmdd-direction-proto # my $id = "$ip-$condensedDate-$dir-$proto"; # $id =~ s/\.//g; # push @values, "('$date', '$ip', $dir, $proto, $octets, $packets)"; # my $sql = "insert into dailyProtoTotals_$classC values ('$id', '$date', '$ip', $dir, $proto, $octets, $packets) ON DUPLICATE KEY UPDATE octets=octets+$octets, packets=packets+$packets"; # print "$sql\n" if $dry; # $db->query($sql) unless $dry; # # $db->query("insert into protoTotals values ('$date', '$ip', $dir, $proto, $octets, $packets)"); # } # } # } # $db->query("unlock tables") unless $dry; # my $sql = "insert into protoTotals_$classC values "; # $sql .= join ',', @values; # $db->query("lock tables protoTotals_$classC write") unless $dry; # print "$sql\n" if $dry; # $db->query($sql) unless $dry; # $db->query("unlock tables") unless $dry; # } &debug("processing port totals at "); &debug(`date`); foreach my $classC (keys(%{$porttotal})) { my @values; foreach my $ip (keys(%{$porttotal->{$classC}})) { foreach my $dir (keys(%{$porttotal->{$classC}->{$ip}})) { foreach my $proto (keys(%{$porttotal->{$classC}->{$ip}->{$dir}})) { foreach my $port (keys(%{$porttotal->{$classC}->{$ip}->{$dir}->{$proto}})) { my $octets = $porttotal->{$classC}->{$ip}->{$dir}->{$proto}->{$port}->{'octetTotal'}; my $packets = $porttotal->{$classC}->{$ip}->{$dir}->{$proto}->{$port}->{'pktTotal'}; # $octets = $octets > 2147483647 ? 0 : $octets; # $packets = $packets > 2147483647 ? 0 : $packets; # dailyPortTotals.id = ip(no .'s)-yyyymmdd-direction-protocol-port my $id = "$ip-$condensedDate-$dir-$proto-$port"; $id =~ s/\.//g; push @values, "('$date', '$ip', $dir, $proto, $port, $octets, $packets)"; my $sql = "insert into dailyPortTotals_$classC values ('$id', '$date', '$ip', $dir, $proto, $port, $octets, $packets) ON DUPLICATE KEY UPDATE octets=octets+$octets, packets=packets+$packets"; print "$sql\n" if $dry; print SQL "$sql;\n"; # $db->query("insert into portTotals values ('$date', '$ip', $dir, $port, $octets, $packets)"); } } } } # break inserts into 100 records at a time &debug("inserting $#values +1 values"); while ($#values > 0) { my $sql = "insert into portTotals_$classC values "; my $max_index = $#values > 100 ? 100 : $#values; for (my $i=0; $i<=$max_index; $i++) { $sql .= shift @values; $sql .= ','; } chop $sql; print "$sql\n" if $dry; print SQL "$sql;\n"; } } # 12 1 8 1 1= 23 # dailyIpTotals.id = ip(no .'s)-yyyymmdd-direction # 12 1 8 1 1 3=26 # dailyProtoTotals.id = ip(no .'s)-yyyymmdd-direction-proto # 12 1 8 1 1 5=28 # dailyPortTotals.id = ip(no .'s)-yyyymmdd-direction-port #print "finished at "; #print `date`; `mv $queuedir/$file $archivedir` unless $dry; close(SQL); `bzip2 $sqldirworking/$outfile`; `mv $sqldirworking/$outfile.bz2 $sqldir/`; } } `rm $queuedir/.lock` unless $dry; sub debug { my $message = shift; if ($debug) { print "$message\n"; } } # var full during ft-v05.2005-03-28.084500-0800 and # 2005-02-24 69.55.226 # all port/daily totals before 2005-04-07 </pre> This script sends the sql files to the traffic server for processing: <pre>cat > /usr/home/flowbin/sendsql.pl #!/usr/bin/perl #use strict; #$debug=1; #$dry=1; my $remote = "69.55.233.199"; my $sqldir = "/usr/home/sql"; my $archive = "/usr/home/archive"; my $sqldirremote = "/data/bwdb2/pending/"; my @err; unless ($dry) { if (-e "$sqldir/.lock") { open(FILE, "$sqldir/.lock"); my $pid = <FILE>; chomp($pid); close(FILE); if (kill(0, $pid)) { #another process is using the queue, bail out exit(0); } else { #dead lock file, remove it `rm $sqldir/.lock`; } } open(FILE, "> $sqldir/.lock"); print FILE "$$\n"; close(FILE); } opendir(DIR, $sqldir); my @files = readdir(DIR); closedir(DIR); foreach my $file (sort @files) { next unless $file =~ /bz2$/; my $r = `scp -Cq $sqldir/$file $remote:$sqldirremote 2>&1`; # print "scp $sqldir/$file $remote:$sqldirremote"; unless ($?==0) { push @err, "scp -Cq $sqldir/$file $remote:$sqldirremote ($r)"; } else { `mv $sqldir/$file $archive`; `ssh $remote mv $sqldirremote/$file $sqldirremote/${file}.done`; } } `rm $sqldir/.lock` unless $dry; if (@err) { email_support('bwdb2: sendsql.pl error',join "\n", @err); } sub email_support { my $subj=shift; my $body=shift; use Mail::Sendmail; # prepare message my %mail = ( To => 'support@johncompanies.com,dave@johncompanies.com', From => 'support@johncompanies.com', Subject => $subj, Message => $body, smtp => 'mail.johncompanies.com', ); sendmail(%mail) || warn "Error: $Mail::Sendmail::error"; } sub debug { my $message = shift; if ($debug) { print "$message\n"; } } # var full during ft-v05.2005-03-28.084500-0800 and # 2005-02-24 69.55.226 # all port/daily totals before 2005-04-07 </pre> <pre>crontab -e #process flows 2,17,32,47 * * * * /usr/home/flowbin/processflows-sql.pl #move sql commands to traffic db 8,23,38,53 * * * * /usr/home/flowbin/sendsql.pl </pre>
Summary:
Please note that all contributions to JCWiki may be edited, altered, or removed by other contributors. If you do not want your writing to be edited mercilessly, then do not submit it here.
You are also promising us that you wrote this yourself, or copied it from a public domain or similar free resource (see
JCWiki:Copyrights
for details).
Do not submit copyrighted work without permission!
Cancel
Editing help
(opens in new window)
Navigation menu
Personal tools
Not logged in
Talk
Contributions
Create account
Log in
Namespaces
Page
Discussion
English
Views
Read
Edit
View history
More
Search
Navigation
Main page
Recent changes
Random page
Help about MediaWiki
Tools
What links here
Related changes
Special pages
Page information