Editing
Infrastructure Machines
(section)
Jump to navigation
Jump to search
Warning:
You are not logged in. Your IP address will be publicly visible if you make any edits. If you
log in
or
create an account
, your edits will be attributed to your username, along with other benefits.
Anti-spam check. Do
not
fill this in!
=== netflow === Install flow tools: <pre>cd /usr/ports/net-mgmt/flow-tools make install clean</pre> Defaults. mkdir /usr/home/flows Flow start script: echo "/usr/local/bin/flow-capture -w /usr/home/flows -S5 -N -2 0/10.1.4.203/4444" > /usr/local/etc/rc.d/flow-capture.sh chmod 0700 /usr/local/etc/rc.d/flow-capture.sh Edit for private IP. Netgraph start script: <pre> cat > /usr/local/etc/rc.d/netgraph.sh /usr/sbin/ngctl -f- <<-SEQ mkpeer em0: netflow lower iface0 name em0:lower netflow connect em0: netflow: upper out0 mkpeer netflow: ksocket export inet/dgram/udp msg netflow:export connect inet/10.1.4.203:4444 SEQ #/usr/sbin/ngctl -f- <<-SEQ #shutdown netflow: #SEQ chmod 0700 /usr/local/etc/rc.d/netgraph.sh</pre> Edit for private IP. Confirm netflow is running after running scripts: <pre>newbwdb /usr/ports/net-mgmt/flow-tools# /usr/sbin/ngctl Available commands: config get or set configuration of node at <path> connect Connects hook <peerhook> of the node at <relpath> to <hook> debug Get/set debugging verbosity level dot Produce a GraphViz (.dot) of the entire netgraph. help Show command summary or get more help on a specific command list Show information about all nodes mkpeer Create and connect a new node to the node at "path" msg Send a netgraph control message to the node at "path" name Assign name <name> to the node at <path> read Read and execute commands from a file rmhook Disconnect hook "hook" of the node at "path" show Show information about the node at <path> shutdown Shutdown the node at <path> status Get human readable status information from the node at <path> types Show information about all installed node types write Send a data packet down the hook named by "hook". quit Exit program + show netflow: Name: netflow Type: netflow ID: 00000004 Num hooks: 3 Local hook Peer name Peer type Peer ID Peer hook ---------- --------- --------- ------- --------- export <unnamed> ksocket 00000005 inet/dgram/udp out0 em0 ether 00000001 upper iface0 em0 ether 00000001 lower + </pre> We notice that sometimes flow-capture is failing due to swap exhaustion (even after adding more swap). So we crontab flow-capture to restart (it's ok to start if it's already running, it just quits): <pre> crontab -e #restart flow-capture */15 * * * * /usr/local/etc/rc.d/flow-capture.sh </pre> ==== process flow tools ==== <pre>mkdir /usr/home/flowbin mkdir /usr/home/working</pre> Install modules: <pre>cd /usr/ports/devel/p5-Date-Calc make install clean cd /usr/ports/mail/p5-Mail-Sendmail make install clean</pre> Queue script: <pre> cat > /usr/home/flowbin/queue.pl #!/usr/bin/perl use strict; BEGIN { push @INC, "/usr/home/flowbin"; } use date; my $flowbase = "/usr/home/flows"; #my $flowqueue = "/usr/home/queue"; my $flowqueue = "/usr/home/working"; my ($date, $time) = date::CurrentDateTime(); my $flowdir = mkFlowDir($date); `mv $flowdir/ft-* $flowqueue`; if (date::DateWindow($date, $time, $date, "00:00:00", 600)) { my $newdate = date::AddDays($date, -1); my $flowdir = mkFlowDir($newdate); `mv $flowdir/ft-* $flowqueue`; } sub mkFlowDir { my $date = shift; $date =~ /([0-9]{4}-[0-9]{2})/; my $yearmonth = $1; return "$flowbase/$yearmonth/$date"; } </pre> Date.pm module: <pre> cat > /usr/home/flowbin/date.pm #!/usr/local/bin/perl # # $Header: /usr/cvs/newgw/lib/date.pm,v 1.2 2003/11/24 17:06:02 glenn Exp $ # # Copyright (c) 2001, 2002, 2003 # e-Monitoring Networks, Inc. All rights reserved. # # # # date.pl - Higher level functions written on top of Date::Calc package date; use strict; use Date::Calc qw(:all); sub DayDiff { #calculate the difference in days from two dates my $date1 = shift; my $date2 = shift; my ($year1, $month1, $day1) = &DateToymd($date1); my ($year2, $month2, $day2) = &DateToymd($date2); my $diff = &Delta_Days($year1, $month1, $day1, $year2, $month2, $day2); return $diff; } sub AddDays { #adds specified number of days to the supplied date my $date = shift; my $days = shift; my ($year, $month, $day) = &DateToymd($date); my ($nyear, $nmonth, $nday) = &Add_Delta_Days($year, $month, $day, $days); my $ndate = &ymdToDate($nyear, $nmonth, $nday); return $ndate; } sub AddHours { #adds specified number of hours to the supplied date and time my $date = shift; my $time = shift; my $addhours = shift; my $adddays = 0; if (abs($addhours / 24) >= 1) { $adddays = int($addhours / 24); $addhours -= $adddays * 24; } my ($year, $month, $day) = &DateToymd($date); my ($hour, $minute, $second) = &TimeTohms($time); my ($ny, $nm, $nd, $nh, $nmin, $ns) = &Add_Delta_DHMS($year, $month, $day, $hour, $minute, $second, $adddays, $addhours, 0, 0); my $ndate = &ymdToDate($ny, $nm, $nd); my $ntime = &hmsToTime($nh, $nmin, $ns); return $ndate, $ntime; } sub AddMinutes { my $date = shift; my $time = shift; my $minutes = shift; my ($year, $month, $day) = &DateToymd($date); my ($hour, $minute, $second) = &TimeTohms($time); my ($ny, $nm, $nd, $nh, $nmin, $ns) = &Add_Delta_DHMS($year, $month, $day, $hour, $minute, $second, 0, 0, $minutes, 0); my $ndate = &ymdToDate($ny, $nm, $nd); my $ntime = &hmsToTime($nh, $nmin, $ns); return $ndate, $ntime; } sub CurrentDateTime { #return the current date and time my ($y, $m, $d, $h, $min, $s, $z, $z, $z) = &System_Clock; my $date = &ymdToDate($y, $m, $d); my $time = &hmsToTime($h, $min, $s); return $date, $time; } sub Currentymd { #return the current year, month and day as separate variables my ($y, $m, $d, $h, $min, $s, $z, $z, $z) = &System_Clock; return $y, $m, $d; } sub DateToymd { #takes a date and returns year, month, day as individual values my $date = shift; if ($date =~ /([0-9]{4})-([0-9]{2})-([0-9]{2})/) { my $day = $3; my $month = $2; my $year = $1; return $year, $month, $day; } return undef; } sub TimeTohms { #takes a time and return hours minutes and seconds as individual values my $time = shift; if ($time =~ /([0-9]{1,2}):([0-9]{1,2}):([0-9]{1,2})/) { my $hour = $1; my $minute = $2; my $second = $3; if ($hour !~ /[0-9]{2}/) { $hour = "0$hour"; } if ($minute !~ /[0-9]{2}/) { $minute = "0$minute"; } if ($second !~ /[0-9]{2}/) { $second = "0$second"; } return $hour, $minute, $second; } return undef; } sub ymdToDate { #takes year, month, day and assembles them into our date format my $year = shift; my $month = shift; my $day = shift; if (defined($year) && defined($month) && defined ($day)) { $month = sprintf("%02d", $month); $day = sprintf("%02d", $day); return "$year-$month-$day"; } return undef; } sub hmsToTime { #takes hour minute and second and assembles them into our time format my $hour = shift; my $minute = shift; my $second = shift; if (defined($hour) && defined($minute) && defined ($second)) { if ($hour !~ /[0-9]{2}/) { $hour = "0$hour"; } if ($minute !~ /[0-9]{2}/) { $minute = "0$minute"; } if ($second !~ /[0-9]{2}/) { $second = "0$second"; } return sprintf ("%02d:%02d:%02d", $hour, $minute, $second); } return undef; } sub CompareDates { #compares two date and time pairs my $date1 = shift; my $time1 = shift; my $date2 = shift; my $time2 = shift; my ($year1, $month1, $day1) = &DateToymd($date1); my ($hour1, $minute1, $second1) = &TimeTohms($time1); my ($year2, $month2, $day2) = &DateToymd($date2); my ($hour2, $minute2, $second2) = &TimeTohms($time2); # &debug("$year1, $month1, $day1, $year2, $month2, $day2"); my $days = &Delta_Days($year1, $month1, $day1, $year2, $month2, $day2); if ($days > 0) { return 1;} if ($days < 0) { return -1;} if ($days == 0) { #same day, compare times my $seconds1 = $second1 + (60 * $minute1) + (3600 * $hour1); my $seconds2 = $second2 + (60 * $minute2) + (3600 * $hour2); if ($seconds1 < $seconds2) { return 1;} if ($seconds1 > $seconds2) { return -1;} if ($seconds1 == $seconds2) { return 0;} } return undef; } sub DateWindow { #compares two date time pairs to see if they are < X seconds apart my $date1 = shift; my $time1 = shift; my $date2 = shift; my $time2 = shift; my $window = shift; my ($year1, $month1, $day1) = &DateToymd($date1); my ($hour1, $minute1, $second1) = &TimeTohms($time1); my ($year2, $month2, $day2) = &DateToymd($date2); my ($hour2, $minute2, $second2) = &TimeTohms($time2); my ($day, $hour, $minute, $second) = &Delta_DHMS($year1, $month1, $day1, $hour1, $minute1, $second1, $year2, $month2, $day2, $hour2, $minute2, $second2); $minute *= 60; $hour *= 3600; $day *= 86400; my $total = $second + $minute + $hour + $day; if (abs($total) < $window) { return 1; } return 0; } sub CheckDateOrder { #takes three dates/times, returns true if they are in chronological order my $date1 = shift; my $time1 = shift; my $date2 = shift; my $time2 = shift; my $date3 = shift; my $time3 = shift; if (&CompareDates($date1, $time1, $date2, $time2) == -1) { return 0; } if (&CompareDates($date2, $time2, $date3, $time3) == -1) { return 0; } return 1; } sub EpochSeconds { #calculates number of seconds since the epoch for the given date/time my $date = shift; my $time = shift; my ($year, $month, $day) = &DateToymd($date); my ($hour, $minute, $second) = &TimeTohms($time); my ($d, $h, $m, $s) = &Delta_DHMS(1970, 1, 1, 0, 0, 0, $year, $month, $day, $hour, $minute, $second); my $seconds = $s + (60 * $m) + (3600 * $h) + (86400 * $d); return $seconds; } sub SecondsToDateTime { #converts seconds since epoch to date/time my $seconds = shift; my $days = int($seconds / 86400); $seconds -= $days * 86400; my $hours = int($seconds / 3600); $seconds -= $hours * 3600; my $minutes = int($seconds / 60); $seconds -= $minutes * 60; my ($year, $month, $day, $hour, $minute, $second) = &Add_Delta_DHMS(1970, 1, 1, 0, 0, 0, $days, $hours, $minutes, $seconds); $month = sprintf("%02d", $month); $day = sprintf("%02d", $day); $hour = sprintf("%02d", $hour); $minute = sprintf("%02d", $minute); $second = sprintf("%02d", $second); return "$year-$month-$day", "$hour:$minute:$second"; } sub DateToDayName { my $date = shift; my ($year, $month, $day) = &DateToymd($date); my $name = &Day_of_Week_to_Text(&Day_of_Week($year, $month, $day)); $name =~ /^[A-Za-z]{3}/; $name = $&; return $name; } sub ValiDate { return @_; } sub CheckBusinessDay { # checks to see if date is business day. 1=yes, 0=no my $date = shift; my ($year, $month, $day) = &DateToymd($date); if (Day_of_Week($year,$month,$day) < 6) { return 1; } else { return 0; } } 1; #don't remove this line </pre> chmod 0700 /usr/home/flowbin/queue.pl Setup cronjob: <pre>crontab -e #move flow data into the queue 1,16,31,46 * * * * /usr/home/flowbin/queue.pl</pre> ==== flow processing: i2b ==== <pre>cat > /usr/home/flowbin/processflows-sql.pl #!/usr/bin/perl #use strict; #$debug=1; #$dry=1; my $log = '/usr/home/flowbin/discards.log'; use Data::Dumper; BEGIN { push @INC, "/usr/home/flowbin"; } #my $queuedir = "/usr/home/queue"; my $queuedir = "/usr/home/working"; my $archivedir = "/usr/home/archive"; my $sqldir = "/usr/home/sql"; my $sqldirworking = "/usr/home/sql/tmp"; unless ($dry) { if (-e "$queuedir/.lock") { open(FILE, "$queuedir/.lock"); my $pid = <FILE>; chomp($pid); close(FILE); if (kill(0, $pid)) { #another process is using the queue, bail out exit(0); } else { #dead lock file, remove it `rm $queuedir/.lock`; } } open(FILE, "> $queuedir/.lock"); print FILE "$$\n"; close(FILE); } opendir(DIR, $queuedir); my @files = readdir(DIR); closedir(DIR); foreach my $file (sort @files) { unless($file =~ /^\./) { $file =~ /([0-9]{4}-[0-9]{2}-[0-9]{2})\.([0-9]{2})([0-9]{2})([0-9]{2})/; my $date = "$1 $2:$3:$4"; my $outfile = "$1-$2:$3.sql"; unless (open (SQL, "+> $sqldirworking/$outfile")) { die "cant open $sqldirworking/$outfile"; } my $condensedDate = $1; $condensedDate =~ s/-//g; my $iptotal = {}; my $protototal = {}; my $porttotal = {}; &debug("started file $file at "); &debug(`date`); &debug("getting raw flow data (flow-print)"); `cat $queuedir/$file | /usr/local/bin/flow-print -f 5 > /usr/home/working/tmp-$file`; &debug("aggregating data at "); &debug(`date`); unless (open(DATA, "/usr/home/working/tmp-$file")) { die "can't open: $!"; } LOOP: while (my $line = readline DATA) { my @d = split /[\s]+/, $line; if ($d[0] ne '' && $d[0] ne 'Start') { my $addr = 0; my $port = 0; #Start End Sif SrcIPaddress SrcP DIf DstIPaddress DstP P Fl Pkts Octets #0 1 2 3 4 5 6 7 8 9 10 11 #| # outbound = 2, inbound = 1 my (@src_ip) = split '\.', $d[3]; my (@dst_ip) = split '\.', $d[6]; if ($src_ip[0] == 69 && $src_ip[1] == 55 && ($src_ip[2] == 229 || $src_ip[2] == 231)) { # for i2b $d[2] = 2; # hack for outbound bulk traffic counted 2x #if ($src_ip[2] == 231) { $d[11] /= 2; $d[10] /= 2; } } # note- this is where we filter out IPs only found at i2b elsif ($dst_ip[0] == 69 && $dst_ip[1] == 55 && ($dst_ip[2] == 229 || $dst_ip[2] == 231)) { # for i2b $d[2] = 1; } else { next LOOP; } if ($d[2] == 2) { $addr = $d[3]; # if the dst-port is low, store that if ($d[7] <= 1024) { $port = $d[7]; } # if the src-port is low, store that elsif ($d[4] <= 1024) { $port = $d[4]; } else { $port = 99999; } } elsif ($d[2] == 1) { $addr = $d[6]; # if the dst-port is high, assume its return traffic, try to store src-port if low if ($d[7] > 1024) { if ($d[4] <= 1024) { $port = $d[4]; } else { $port = 99999; } } else { $port = $d[7]; } } else { next LOOP; } my (@ip) = split '\.', $addr; unless ($ip[0] == 69) { next LOOP; } unless ($ip[1] == 55) { next LOOP; } unless ($ip[2] == 229 || $ip[2] == 231) { next LOOP; } my $classC = "$ip[0]_$ip[1]_$ip[2]"; # IP dir # if ($d[10] < 2147483647) { $iptotal->{$classC}->{$addr}->{$d[2]}->{'pktTotal'} += $d[10]; } # if ($d[11] < 2147483647) { $iptotal->{$classC}->{$addr}->{$d[2]}->{'octetTotal'} += $d[11]; } # # if ($d[10] < 2147483647) { $protototal->{$classC}->{$addr}->{$d[2]}->{$d[8]}->{'pktTotal'} += $d[10]; } # if ($d[11] < 2147483647) { $protototal->{$classC}->{$addr}->{$d[2]}->{$d[8]}->{'octetTotal'} += $d[11]; } # # if ($d[10] < 2147483647) { $porttotal->{$classC}->{$addr}->{$d[2]}->{$port}->{'pktTotal'} += $d[10]; } # if ($d[11] < 2147483647) { $porttotal->{$classC}->{$addr}->{$d[2]}->{$port}->{'octetTotal'} += $d[11]; } $iptotal->{$classC}->{$addr}->{$d[2]}->{'pktTotal'} += $d[10]; $iptotal->{$classC}->{$addr}->{$d[2]}->{'octetTotal'} += $d[11]; $protototal->{$classC}->{$addr}->{$d[2]}->{$d[8]}->{'pktTotal'} += $d[10]; $protototal->{$classC}->{$addr}->{$d[2]}->{$d[8]}->{'octetTotal'} += $d[11]; $porttotal->{$classC}->{$addr}->{$d[2]}->{$d[8]}->{$port}->{'pktTotal'} += $d[10]; $porttotal->{$classC}->{$addr}->{$d[2]}->{$d[8]}->{$port}->{'octetTotal'} += $d[11]; } } close(DATA); `rm /usr/home/working/tmp-$file`; &debug("processing ip totals at "); &debug(`date`); foreach my $classC (keys(%{$iptotal})) { my @values; foreach my $ip (keys(%{$iptotal->{$classC}})) { foreach my $dir (keys(%{$iptotal->{$classC}->{$ip}})) { my $octets = $iptotal->{$classC}->{$ip}->{$dir}->{'octetTotal'}; my $packets = $iptotal->{$classC}->{$ip}->{$dir}->{'pktTotal'}; # $packets = $packets > 2147483647 ? 0 : $packets; if ($octets > 2147483647) { my $ddir = $dir==1 ? 'in' : 'out'; #print SQL "$date $ip $ddir $octets\n"; # $octets = 0; } # dailyIpTotals.id = ip(no .'s)-yyyymmdd-direction my $id = "$ip-$condensedDate-$dir"; $id =~ s/\.//g; push @values, "('$date', '$ip', $dir, $octets, $packets)"; my $sql = "insert into dailyIpTotals_$classC values ('$id', '$date', '$ip', $dir, $octets, $packets) ON DUPLICATE KEY UPDATE octets=octets+$octets, packets=packets+$packets"; print "$sql\n" if $dry; print SQL "$sql;\n"; # $db->query("insert into ipTotals values ('$date', '$ip', $dir, $octets, $packets)"); } } # break inserts into 100 records at a time &debug("inserting $#values +1 values"); while ($#values > 0) { my $sql = "insert into ipTotals_$classC values "; my $max_index = $#values > 100 ? 100 : $#values; for (my $i=0; $i<=$max_index; $i++) { $sql .= shift @values; $sql .= ','; } chop $sql; print "$sql\n" if $dry; print SQL "$sql;\n"; } } # &debug("processing protocol totals at "); # &debug(`date`); # foreach my $classC (keys(%{$protototal})) { # $db->query("lock tables dailyProtoTotals_$classC write") unless $dry; # my @values; # foreach my $ip (keys(%{$protototal->{$classC}})) { # foreach my $dir (keys(%{$protototal->{$classC}->{$ip}})) { # foreach my $proto (keys(%{$protototal->{$classC}->{$ip}->{$dir}})) { # my $octets = $protototal->{$classC}->{$ip}->{$dir}->{$proto}->{'octetTotal'}; # my $packets = $protototal->{$classC}->{$ip}->{$dir}->{$proto}->{'pktTotal'}; # # $octets = $octets > 2147483647 ? 0 : $octets; # # $packets = $packets > 2147483647 ? 0 : $packets; # # dailyProtoTotals.id = ip(no .'s)-yyyymmdd-direction-proto # my $id = "$ip-$condensedDate-$dir-$proto"; # $id =~ s/\.//g; # push @values, "('$date', '$ip', $dir, $proto, $octets, $packets)"; # my $sql = "insert into dailyProtoTotals_$classC values ('$id', '$date', '$ip', $dir, $proto, $octets, $packets) ON DUPLICATE KEY UPDATE octets=octets+$octets, packets=packets+$packets"; # print "$sql\n" if $dry; # $db->query($sql) unless $dry; # # $db->query("insert into protoTotals values ('$date', '$ip', $dir, $proto, $octets, $packets)"); # } # } # } # $db->query("unlock tables") unless $dry; # my $sql = "insert into protoTotals_$classC values "; # $sql .= join ',', @values; # $db->query("lock tables protoTotals_$classC write") unless $dry; # print "$sql\n" if $dry; # $db->query($sql) unless $dry; # $db->query("unlock tables") unless $dry; # } &debug("processing port totals at "); &debug(`date`); foreach my $classC (keys(%{$porttotal})) { my @values; foreach my $ip (keys(%{$porttotal->{$classC}})) { foreach my $dir (keys(%{$porttotal->{$classC}->{$ip}})) { foreach my $proto (keys(%{$porttotal->{$classC}->{$ip}->{$dir}})) { foreach my $port (keys(%{$porttotal->{$classC}->{$ip}->{$dir}->{$proto}})) { my $octets = $porttotal->{$classC}->{$ip}->{$dir}->{$proto}->{$port}->{'octetTotal'}; my $packets = $porttotal->{$classC}->{$ip}->{$dir}->{$proto}->{$port}->{'pktTotal'}; # $octets = $octets > 2147483647 ? 0 : $octets; # $packets = $packets > 2147483647 ? 0 : $packets; # dailyPortTotals.id = ip(no .'s)-yyyymmdd-direction-protocol-port my $id = "$ip-$condensedDate-$dir-$proto-$port"; $id =~ s/\.//g; push @values, "('$date', '$ip', $dir, $proto, $port, $octets, $packets)"; my $sql = "insert into dailyPortTotals_$classC values ('$id', '$date', '$ip', $dir, $proto, $port, $octets, $packets) ON DUPLICATE KEY UPDATE octets=octets+$octets, packets=packets+$packets"; print "$sql\n" if $dry; print SQL "$sql;\n"; # $db->query("insert into portTotals values ('$date', '$ip', $dir, $port, $octets, $packets)"); } } } } # break inserts into 100 records at a time &debug("inserting $#values +1 values"); while ($#values > 0) { my $sql = "insert into portTotals_$classC values "; my $max_index = $#values > 100 ? 100 : $#values; for (my $i=0; $i<=$max_index; $i++) { $sql .= shift @values; $sql .= ','; } chop $sql; print "$sql\n" if $dry; print SQL "$sql;\n"; } } # 12 1 8 1 1= 23 # dailyIpTotals.id = ip(no .'s)-yyyymmdd-direction # 12 1 8 1 1 3=26 # dailyProtoTotals.id = ip(no .'s)-yyyymmdd-direction-proto # 12 1 8 1 1 5=28 # dailyPortTotals.id = ip(no .'s)-yyyymmdd-direction-port #print "finished at "; #print `date`; `mv $queuedir/$file $archivedir` unless $dry; close(SQL); `bzip2 $sqldirworking/$outfile`; `mv $sqldirworking/$outfile.bz2 $sqldir/`; } } `rm $queuedir/.lock` unless $dry; sub debug { my $message = shift; if ($debug) { print "$message\n"; } } # var full during ft-v05.2005-03-28.084500-0800 and # 2005-02-24 69.55.226 # all port/daily totals before 2005-04-07 </pre> This script sends the sql files to the traffic server for processing: <pre>cat > /usr/home/flowbin/sendsql.pl #!/usr/bin/perl #use strict; #$debug=1; #$dry=1; my $remote = "69.55.233.199"; my $sqldir = "/usr/home/sql"; my $archive = "/usr/home/archive"; my $sqldirremote = "/data/bwdb2/pending/"; my @err; unless ($dry) { if (-e "$sqldir/.lock") { open(FILE, "$sqldir/.lock"); my $pid = <FILE>; chomp($pid); close(FILE); if (kill(0, $pid)) { #another process is using the queue, bail out exit(0); } else { #dead lock file, remove it `rm $sqldir/.lock`; } } open(FILE, "> $sqldir/.lock"); print FILE "$$\n"; close(FILE); } opendir(DIR, $sqldir); my @files = readdir(DIR); closedir(DIR); foreach my $file (sort @files) { next unless $file =~ /bz2$/; my $r = `scp -Cq $sqldir/$file $remote:$sqldirremote 2>&1`; # print "scp $sqldir/$file $remote:$sqldirremote"; unless ($?==0) { push @err, "scp -Cq $sqldir/$file $remote:$sqldirremote ($r)"; } else { `mv $sqldir/$file $archive`; `ssh $remote mv $sqldirremote/$file $sqldirremote/${file}.done`; } } `rm $sqldir/.lock` unless $dry; if (@err) { email_support('bwdb2: sendsql.pl error',join "\n", @err); } sub email_support { my $subj=shift; my $body=shift; use Mail::Sendmail; # prepare message my %mail = ( To => 'support@johncompanies.com,dave@johncompanies.com', From => 'support@johncompanies.com', Subject => $subj, Message => $body, smtp => 'mail.johncompanies.com', ); sendmail(%mail) || warn "Error: $Mail::Sendmail::error"; } sub debug { my $message = shift; if ($debug) { print "$message\n"; } } # var full during ft-v05.2005-03-28.084500-0800 and # 2005-02-24 69.55.226 # all port/daily totals before 2005-04-07 </pre> <pre>crontab -e #process flows 2,17,32,47 * * * * /usr/home/flowbin/processflows-sql.pl #move sql commands to traffic db 8,23,38,53 * * * * /usr/home/flowbin/sendsql.pl </pre> ==== flow processing: castle ==== <pre> cat > /usr/home/flowbin/processflows.pl #!/usr/bin/perl #use strict; #$debug=1; #$dry=1; my $log = '/usr/home/flowbin/discards.log'; use Data::Dumper; BEGIN { push @INC, "/usr/home/flowbin"; } use db; #my $queuedir = "/usr/home/queue"; my $queuedir = "/usr/home/working"; my $archivedir = "/usr/home/archive"; unless ($dry) { if (-e "$queuedir/.lock") { open(FILE, "$queuedir/.lock"); my $pid = <FILE>; chomp($pid); close(FILE); if (kill(0, $pid)) { #another process is using the queue, bail out exit(0); } else { #dead lock file, remove it `rm $queuedir/.lock`; } } open(FILE, "> $queuedir/.lock"); print FILE "$$\n"; close(FILE); } my $db = db->new(); $db->connect('traffic', '', 'root', '5over3') || die $db->{'error'}; opendir(DIR, $queuedir); my @files = readdir(DIR); closedir(DIR); foreach my $file (sort @files) { unless($file =~ /^\./) { $file =~ /([0-9]{4}-[0-9]{2}-[0-9]{2})\.([0-9]{2})([0-9]{2})([0-9]{2})/; my $date = "$1 $2:$3:$4"; my $condensedDate = $1; $condensedDate =~ s/-//g; my $iptotal = {}; my $protototal = {}; my $porttotal = {}; &debug("started file $file at "); &debug(`date`); &debug("getting raw flow data (flow-print)"); `cat $queuedir/$file | /usr/local/bin/flow-print -f 5 > /usr/home/working/tmp-$file`; &debug("aggregating data at "); &debug(`date`); unless (open(DATA, "/usr/home/working/tmp-$file")) { die "can't open: $!"; } LOOP: while (my $line = readline DATA) { my @d = split /[\s]+/, $line; if ($d[0] ne '' && $d[0] ne 'Start') { my $addr = 0; my $port = 0; #Start End Sif SrcIPaddress SrcP DIf DstIPaddress DstP P Fl Pkts Octets #0 1 2 3 4 5 6 7 8 9 10 11 #| # outbound = 2, inbound = 1 my (@src_ip) = split '\.', $d[3]; my (@dst_ip) = split '\.', $d[6]; if ($src_ip[0] == 69 && $src_ip[1] == 55 && $src_ip[2] >= 224 && $src_ip[2] <= 239 && $src_ip[2] != 229 && $src_ip[2] != 231) { # for castle # if ($src_ip[0] == 69 && $src_ip[1] == 55 && $src_ip[2] == 229) { # for i2b $d[2] = 2; # hack for outbound bulk traffic counted 2x if ($dst_ip[2] == 234) { $d[11] /= 2; $d[10] /= 2; } } elsif ($dst_ip[0] == 69 && $dst_ip[1] == 55 && $dst_ip[2] >= 224 && $dst_ip[2] <= 239 && $dst_ip[2] != 229 && $dst_ip[2] != 231) { # for castle # elsif ($dst_ip[0] == 69 && $dst_ip[1] == 55 && $dst_ip[2] == 229) { # for i2b $d[2] = 1; } else { next LOOP; } if ($d[2] == 2) { $addr = $d[3]; # if the dst-port is low, store that if ($d[7] <= 1024) { $port = $d[7]; } # if the src-port is low, store that elsif ($d[4] <= 1024) { $port = $d[4]; } else { $port = 99999; } } elsif ($d[2] == 1) { $addr = $d[6]; # if the dst-port is high, assume its return traffic, try to store src-port if low if ($d[7] > 1024) { if ($d[4] <= 1024) { $port = $d[4]; } else { $port = 99999; } } else { $port = $d[7]; } } else { next LOOP; } my (@ip) = split '\.', $addr; unless ($ip[0] == 69) { next LOOP; } unless ($ip[1] == 55) { next LOOP; } unless ($ip[2] >= 224 && $ip[2] <= 239 && $ip[2] != 229 && $ip[2] != 231) { next LOOP; } # unless ($ip[2] == 229) { next LOOP; } my $classC = "$ip[0]_$ip[1]_$ip[2]"; # IP dir # if ($d[10] < 2147483647) { $iptotal->{$classC}->{$addr}->{$d[2]}->{'pktTotal'} += $d[10]; } # if ($d[11] < 2147483647) { $iptotal->{$classC}->{$addr}->{$d[2]}->{'octetTotal'} += $d[11]; } # # if ($d[10] < 2147483647) { $protototal->{$classC}->{$addr}->{$d[2]}->{$d[8]}->{'pktTotal'} += $d[10]; } # if ($d[11] < 2147483647) { $protototal->{$classC}->{$addr}->{$d[2]}->{$d[8]}->{'octetTotal'} += $d[11]; } # # if ($d[10] < 2147483647) { $porttotal->{$classC}->{$addr}->{$d[2]}->{$port}->{'pktTotal'} += $d[10]; } # if ($d[11] < 2147483647) { $porttotal->{$classC}->{$addr}->{$d[2]}->{$port}->{'octetTotal'} += $d[11]; } $iptotal->{$classC}->{$addr}->{$d[2]}->{'pktTotal'} += $d[10]; $iptotal->{$classC}->{$addr}->{$d[2]}->{'octetTotal'} += $d[11]; $protototal->{$classC}->{$addr}->{$d[2]}->{$d[8]}->{'pktTotal'} += $d[10]; $protototal->{$classC}->{$addr}->{$d[2]}->{$d[8]}->{'octetTotal'} += $d[11]; $porttotal->{$classC}->{$addr}->{$d[2]}->{$d[8]}->{$port}->{'pktTotal'} += $d[10]; $porttotal->{$classC}->{$addr}->{$d[2]}->{$d[8]}->{$port}->{'octetTotal'} += $d[11]; } } close(DATA); `rm /usr/home/working/tmp-$file`; &debug("processing ip totals at "); &debug(`date`); foreach my $classC (keys(%{$iptotal})) { $db->query("lock tables dailyIpTotals_$classC write") unless $dry; my @values; foreach my $ip (keys(%{$iptotal->{$classC}})) { foreach my $dir (keys(%{$iptotal->{$classC}->{$ip}})) { my $octets = $iptotal->{$classC}->{$ip}->{$dir}->{'octetTotal'}; my $packets = $iptotal->{$classC}->{$ip}->{$dir}->{'pktTotal'}; # $packets = $packets > 2147483647 ? 0 : $packets; if ($octets > 2147483647) { my $ddir = $dir==1 ? 'in' : 'out'; `echo "$date $ip $ddir $octets\n" >> $log`; # $octets = 0; } # dailyIpTotals.id = ip(no .'s)-yyyymmdd-direction my $id = "$ip-$condensedDate-$dir"; $id =~ s/\.//g; push @values, "('$date', '$ip', $dir, $octets, $packets)"; my $sql = "insert into dailyIpTotals_$classC values ('$id', '$date', '$ip', $dir, $octets, $packets) ON DUPLICATE KEY UPDATE octets=octets+$octets, packets=packets+$packets"; print "$sql\n" if $dry; $db->query($sql) unless $dry; # $db->query("insert into ipTotals values ('$date', '$ip', $dir, $octets, $packets)"); } } $db->query("unlock tables") unless $dry; $db->query("lock tables ipTotals_$classC write") unless $dry; # break inserts into 100 records at a time &debug("inserting $#values +1 values"); while ($#values > 0) { my $sql = "insert into ipTotals_$classC values "; my $max_index = $#values > 100 ? 100 : $#values; for (my $i=0; $i<=$max_index; $i++) { $sql .= shift @values; $sql .= ','; } chop $sql; print "$sql\n" if $dry; $db->query($sql) unless $dry; } $db->query("unlock tables") unless $dry; } sleep 20; # &debug("processing protocol totals at "); # &debug(`date`); # foreach my $classC (keys(%{$protototal})) { # $db->query("lock tables dailyProtoTotals_$classC write") unless $dry; # my @values; # foreach my $ip (keys(%{$protototal->{$classC}})) { # foreach my $dir (keys(%{$protototal->{$classC}->{$ip}})) { # foreach my $proto (keys(%{$protototal->{$classC}->{$ip}->{$dir}})) { # my $octets = $protototal->{$classC}->{$ip}->{$dir}->{$proto}->{'octetTotal'}; # my $packets = $protototal->{$classC}->{$ip}->{$dir}->{$proto}->{'pktTotal'}; # # $octets = $octets > 2147483647 ? 0 : $octets; # # $packets = $packets > 2147483647 ? 0 : $packets; # # dailyProtoTotals.id = ip(no .'s)-yyyymmdd-direction-proto # my $id = "$ip-$condensedDate-$dir-$proto"; # $id =~ s/\.//g; # push @values, "('$date', '$ip', $dir, $proto, $octets, $packets)"; # my $sql = "insert into dailyProtoTotals_$classC values ('$id', '$date', '$ip', $dir, $proto, $octets, $packets) ON DUPLICATE KEY UPDATE octets=octets+$octets, packets=packets+$packets"; # print "$sql\n" if $dry; # $db->query($sql) unless $dry; # # $db->query("insert into protoTotals values ('$date', '$ip', $dir, $proto, $octets, $packets)"); # } # } # } # $db->query("unlock tables") unless $dry; # my $sql = "insert into protoTotals_$classC values "; # $sql .= join ',', @values; # $db->query("lock tables protoTotals_$classC write") unless $dry; # print "$sql\n" if $dry; # $db->query($sql) unless $dry; # $db->query("unlock tables") unless $dry; # } &debug("processing port totals at "); &debug(`date`); foreach my $classC (keys(%{$porttotal})) { $db->query("lock tables dailyPortTotals_$classC write") unless $dry; my @values; foreach my $ip (keys(%{$porttotal->{$classC}})) { foreach my $dir (keys(%{$porttotal->{$classC}->{$ip}})) { foreach my $proto (keys(%{$porttotal->{$classC}->{$ip}->{$dir}})) { foreach my $port (keys(%{$porttotal->{$classC}->{$ip}->{$dir}->{$proto}})) { my $octets = $porttotal->{$classC}->{$ip}->{$dir}->{$proto}->{$port}->{'octetTotal'}; my $packets = $porttotal->{$classC}->{$ip}->{$dir}->{$proto}->{$port}->{'pktTotal'}; # $octets = $octets > 2147483647 ? 0 : $octets; # $packets = $packets > 2147483647 ? 0 : $packets; # dailyPortTotals.id = ip(no .'s)-yyyymmdd-direction-protocol-port my $id = "$ip-$condensedDate-$dir-$proto-$port"; $id =~ s/\.//g; push @values, "('$date', '$ip', $dir, $proto, $port, $octets, $packets)"; my $sql = "insert into dailyPortTotals_$classC values ('$id', '$date', '$ip', $dir, $proto, $port, $octets, $packets) ON DUPLICATE KEY UPDATE octets=octets+$octets, packets=packets+$packets"; print "$sql\n" if $dry; $db->query($sql) unless $dry; # $db->query("insert into portTotals values ('$date', '$ip', $dir, $port, $octets, $packets)"); } } } } $db->query("unlock tables") unless $dry; $db->query("lock tables portTotals_$classC write") unless $dry; # break inserts into 100 records at a time &debug("inserting $#values +1 values"); while ($#values > 0) { my $sql = "insert into portTotals_$classC values "; my $max_index = $#values > 100 ? 100 : $#values; for (my $i=0; $i<=$max_index; $i++) { $sql .= shift @values; $sql .= ','; } chop $sql; print "$sql\n" if $dry; $db->query($sql) unless $dry; } $db->query("unlock tables") unless $dry; sleep 10; } # 12 1 8 1 1= 23 # dailyIpTotals.id = ip(no .'s)-yyyymmdd-direction # 12 1 8 1 1 3=26 # dailyProtoTotals.id = ip(no .'s)-yyyymmdd-direction-proto # 12 1 8 1 1 5=28 # dailyPortTotals.id = ip(no .'s)-yyyymmdd-direction-port #print "finished at "; #print `date`; `mv $queuedir/$file $archivedir` unless $dry; } } `rm $queuedir/.lock` unless $dry; sub debug { my $message = shift; if ($debug) { print "$message\n"; } } # var full during ft-v05.2005-03-28.084500-0800 and # 2005-02-24 69.55.226 # all port/daily totals before 2005-04-07 </pre> <pre> cat > /usr/home/flowbin/db.pm #!/usr/bin/perl # # $Header: /usr/cvs/newgw/lib/db.pm,v 1.4 2003/06/05 18:20:01 glenn Exp $ # # Copyright (c) 2003 # e-Monitoring Networks, Inc. All rights reserved. # # package db; use strict; use DBI; sub new { my $class = shift; my $self = {}; $self->{'debug'} = 0; bless $self, $class; return $self; } sub connect { my $self = shift; my $dbname = shift; my $dbhost = shift; my $dbuser = shift; my $dbpass = shift; my $host = ''; if (defined($dbhost)) { $host = ";host=$dbhost"; } eval { $self->debug("connecting to: DBI:mysql:database=$dbname;$host", 1); $self->{'dbh'} = DBI->connect("DBI:mysql:database=$dbname;$host", $dbuser, $dbpass); }; if ($self->{'dbh'}) { return 1; } $self->{'error'} = "Error connecting to database $@"; $self->debug("Error connecting to database $@"); return 0; } sub query { my $self = shift; my $query = shift; $self->debug($query, 1); my $sth; eval { $sth = $self->{'dbh'}->prepare($query); }; unless ($sth) { $self->{'error'} = "error preparing query $@"; $self->debug("error preparing query $@"); return undef; } my $qty; eval { $qty = $sth->execute; }; unless ($qty) { $self->{'error'} = "error executing query $@"; warn "error executing query $@ $query"; return undef; } $self->debug("returning $qty, $sth from query", 6); return ($qty, $sth); } sub disconnect { my $self = shift; $self->{'dbh'}->disconnect; return 0; } sub debug { my $self = shift; my $msg = shift; my $level = shift || 0; if ($level < $self->{'debug'}) { print "$msg\n"; } return 0; } 1; </pre> mkdir /usr/home/archive mkdir -p /usr/home/sql/tmp <pre>crontab -e #process flows 2,17,32,47 * * * * /usr/home/flowbin/processflows.pl</pre> ==== setup traffic db ==== * Install mysql: <pre>cd /usr/ports/databases/mysql50-server make install clean</pre> cat >> /etc/rc.conf mysql_enable="YES" Move db data dir: /usr/local/etc/rc.d/mysql-server stop mkdir /usr/home/database/ mv /var/db/mysql/* /usr/home/database/ chown -R mysql:mysql /usr/home/database Edit database location in startup script: vi /usr/local/etc/rc.d/mysql-server # : ${mysql_dbdir="/var/db/mysql"} : ${mysql_dbdir="/usr/home/database"} /usr/local/etc/rc.d/mysql-server start * Install mysql perl database modules: <pre> cd /usr/ports/databases/p5-DBI make install clean cd /usr/ports/databases/p5-DBD-mysql50 make install clean (no to SSL support) </pre> * Setting up database <pre> rehash /usr/local/etc/rc.d/mysql-server start mysql -u root create database traffic; grant all on *.* to root@localhost identified by '5over3'; grant all on traffic.* to jc@10.1.4.5 identified by '2gMKY3Wt'; </pre> If this was a new server we'd setup new tables. See [[#mysql_2|mysql]] for how those tables would be setup. We are assuming here we are moving data from an existing db, here's how that's done (from the current traffic db): rsync -av --progress /usr/home/database/traffic/ 10.1.4.203:/usr/home/database/traffic/ When you're ready to do the cutover, shut down mysql on both hosts and do one last sync. ==== process flows from bwdb2 ==== On traffic database server (bwdb): <pre>crontab -e #import sql from bwdb2 10,25,40,55 * * * * /usr/home/flowbin/processsql.pl</pre> Add access to mysql: <pre>mysql -u root -p grant all on traffic.* to bwdb2@localhost identified by 's1lver4d'; </pre> <pre>cat > /usr/home/flowbin/processsql.pl #!/usr/bin/perl #use strict; #$debug=1; #$dry=1; my $sqldir = "/usr/home/bwdb2/pending"; my $mysql = '/usr/local/bin/mysql'; my @err; unless ($dry) { if (-e "$sqldir/.lock") { open(FILE, "$sqldir/.lock"); my $pid = <FILE>; chomp($pid); close(FILE); if (kill(0, $pid)) { #another process is using the queue, bail out exit(0); } else { #dead lock file, remove it `rm $sqldir/.lock`; } } open(FILE, "> $sqldir/.lock"); print FILE "$$\n"; close(FILE); } opendir(DIR, $sqldir); my @files = readdir(DIR); closedir(DIR); foreach my $file (sort @files) { next unless $file =~ /done$/; my $r = `bzcat $sqldir/$file | $mysql -u bwdb2 -ps1lver4d traffic`; unless ($?==0) { push @err, "bzcat $sqldir/$file | $mysql -u bwdb2 -pxxxxx traffic ($r)"; } else { `rm $sqldir/$file`; } } `rm $sqldir/.lock` unless $dry; if (@err) { email_support('bwdb: processsql.pl error',join "\n", @err); } sub email_support { my $subj=shift; my $body=shift; use Mail::Sendmail; # prepare message my %mail = ( To => 'dave@johncompanies.com', From => 'support@johncompanies.com', Subject => $subj, Message => $body, smtp => 'mail.johncompanies.com', ); sendmail(%mail) || warn "Error: $Mail::Sendmail::error"; } sub debug { my $message = shift; if ($debug) { print "$message\n"; } } </pre> chmod 0700 /usr/home/flowbin/processsql.pl Make sure bwdb is reachable from the outside only to bwdb2: On nat, add to <tt>/etc/ipnat.rules</tt> <pre># bwdb bimap fxp0 10.1.4.203/32 -> 69.55.233.199/32</pre> Reload: ipnat -C -F -f /etc/ipnat.rules Setup firewall rule on firewall: ipfw add 00094 allow ip from 66.181.18.5 to 69.55.233.199 22 ipfw add 00094 deny ip from any to 69.55.233.199 Setup firewall on bwdb to restrict access now that it's nat'd: <pre> cat >> /usr/local/etc/rc.d/boot.sh ipfw add 1 allow tcp from any to any established ipfw add 2 allow ip from 10.1.4.0/24,66.181.18.5,69.55.233.195 to me 22 ipfw add 3 allow ip from 10.1.4.5 to me 3306 ipfw add 4 allow ip from 69.55.225.225 53 to me ipfw add 5 allow ip from 69.55.230.2 25 to me ipfw add 6 allow ip from me to me 4444 ipfw add 7 allow icmp from any to me ipfw add 8 allow udp from 10.1.4.203 to 10.1.4.203 dst-port 4444 ipfw add 9 allow udp from 10.1.4.5 to me 161 ipfw add 100 deny ip from any to me </pre> chmod 0700 /usr/local/etc/rc.d/boot.sh From bwdb2, add ssh key: cat /root/.ssh/id_dsa.pub | ssh 69.55.233.199 'cat - >> /root/.ssh/authorized_keys' Confirm no password access: ssh 69.55.233.199 hostname
Summary:
Please note that all contributions to JCWiki may be edited, altered, or removed by other contributors. If you do not want your writing to be edited mercilessly, then do not submit it here.
You are also promising us that you wrote this yourself, or copied it from a public domain or similar free resource (see
JCWiki:Copyrights
for details).
Do not submit copyrighted work without permission!
Cancel
Editing help
(opens in new window)
Navigation menu
Personal tools
Not logged in
Talk
Contributions
Create account
Log in
Namespaces
Page
Discussion
English
Views
Read
Edit
View history
More
Search
Navigation
Main page
Recent changes
Random page
Help about MediaWiki
Tools
What links here
Related changes
Special pages
Page information