mirror of
https://github.com/ZoneMinder/zoneminder.git
synced 2026-01-28 16:11:14 -05:00
Functional new architecture version.
git-svn-id: http://svn.zoneminder.com/svn/zm/trunk@1739 e3e1d417-86f3-4887-817a-d78f3d33393f
This commit is contained in:
@@ -38,41 +38,517 @@ use constant DBG_LEVEL => 1; # 0 is errors, warnings and info only, > 0 for debu
|
||||
|
||||
use constant MAX_CONNECT_DELAY => 10;
|
||||
use constant MONITOR_RELOAD_INTERVAL => 300;
|
||||
use constant SELECT_TIMEOUT => 0.25;
|
||||
|
||||
# Now define the trigger connections, can be inet socket, unix socket or file based
|
||||
# The isub field should point to a subroutine to convert input messages if necessary
|
||||
# The osub field should point to a subroutine to convert output messages if necessary
|
||||
#
|
||||
# Define classes for any channels that triggers may go in and/or out over
|
||||
#
|
||||
|
||||
my @conns = (
|
||||
# Base channel class
|
||||
package Channel;
|
||||
|
||||
use ZoneMinder::Debug;
|
||||
|
||||
our $AUTOLOAD;
|
||||
|
||||
sub new
|
||||
{
|
||||
my $class = shift;
|
||||
my $self = {};
|
||||
$self->{readable} = !undef;
|
||||
$self->{writeable} = !undef;
|
||||
$self->{selectable} = undef;
|
||||
$self->{state} = 'closed';
|
||||
bless( $self, $class );
|
||||
return $self;
|
||||
}
|
||||
|
||||
sub clone
|
||||
{
|
||||
my $self = shift;
|
||||
my $clone = { %$self };
|
||||
bless $clone, ref $self;
|
||||
}
|
||||
|
||||
sub open()
|
||||
{
|
||||
my $self = shift;
|
||||
my $class = ref($self) or die( "Can't get class for non object $self" );
|
||||
die( "Abstract base class method called for object of class $class" );
|
||||
}
|
||||
|
||||
sub close()
|
||||
{
|
||||
my $self = shift;
|
||||
my $class = ref($self) or die( "Can't get class for non object $self" );
|
||||
die( "Abstract base class method called for object of class $class" );
|
||||
}
|
||||
|
||||
sub getState()
|
||||
{
|
||||
my $self = shift;
|
||||
return( $self->{state} );
|
||||
}
|
||||
|
||||
sub isOpen()
|
||||
{
|
||||
my $self = shift;
|
||||
return( $self->{state} eq "open" );
|
||||
}
|
||||
|
||||
sub isConnected()
|
||||
{
|
||||
my $self = shift;
|
||||
return( $self->{state} eq "connected" );
|
||||
}
|
||||
|
||||
sub DESTROY
|
||||
{
|
||||
}
|
||||
|
||||
sub AUTOLOAD
|
||||
{
|
||||
my $self = shift;
|
||||
my $class = ref($self) || die( "$self not object" );
|
||||
my $name = $AUTOLOAD;
|
||||
$name =~ s/.*://;
|
||||
if ( !exists($self->{$name}) )
|
||||
{
|
||||
name => "S1",
|
||||
type => "inet",
|
||||
port => "6802",
|
||||
in => !undef,
|
||||
out => !undef,
|
||||
isub => sub { $_[0] =~ s/-/|/g; return( $_[0] ); },
|
||||
osub => sub { $_[0] =~ s/\|/-/g; return( $_[0] ); },
|
||||
ids => [ 1 ],
|
||||
},
|
||||
die( "Can't access $name member of object of class $class" );
|
||||
}
|
||||
return( $self->{$name} );
|
||||
}
|
||||
|
||||
# Handle based channel
|
||||
package Channel::Handle;
|
||||
our @ISA = qw(Channel);
|
||||
|
||||
use ZoneMinder::Debug qw(:all);
|
||||
use POSIX;
|
||||
|
||||
sub new
|
||||
{
|
||||
my $class = shift;
|
||||
my $port = shift;
|
||||
my $self = Channel->new();
|
||||
$self->{handle} = undef;
|
||||
bless( $self, $class );
|
||||
return $self;
|
||||
}
|
||||
|
||||
sub close()
|
||||
{
|
||||
my $self = shift;
|
||||
close( $self->{handle} );
|
||||
$self->{state} = 'closed';
|
||||
$self->{handle} = undef;
|
||||
}
|
||||
|
||||
sub read()
|
||||
{
|
||||
my $self = shift;
|
||||
my $buffer;
|
||||
my $nbytes = sysread( $self->{handle}, $buffer, POSIX::BUFSIZ );
|
||||
if ( !$nbytes )
|
||||
{
|
||||
name => "S2",
|
||||
type => "unix",
|
||||
path => "/tmp/test.sock",
|
||||
in => !undef,
|
||||
out => !undef,
|
||||
isub => undef,
|
||||
osub => undef,
|
||||
},
|
||||
return( undef );
|
||||
}
|
||||
Debug( "Read '$buffer' ($nbytes bytes)\n" );
|
||||
return( $buffer );
|
||||
}
|
||||
|
||||
sub write()
|
||||
{
|
||||
my $self = shift;
|
||||
my $buffer = shift;
|
||||
my $nbytes = syswrite( $self->{handle}, $buffer );
|
||||
if ( !defined( $nbytes) || $nbytes < length($buffer) )
|
||||
{
|
||||
name => "S3",
|
||||
type => "file",
|
||||
path => "/dev/ttyS0",
|
||||
in => undef,
|
||||
out => undef,
|
||||
isub => undef,
|
||||
osub => undef,
|
||||
},
|
||||
);
|
||||
Error( "Unable to write buffer '".$buffer.", expected ".length($buffer)." bytes, sent ".$nbytes.": $!\n" );
|
||||
return( undef );
|
||||
}
|
||||
Debug( "Wrote '$buffer' ($nbytes bytes)\n" );
|
||||
return( !undef );
|
||||
}
|
||||
|
||||
sub fileno()
|
||||
{
|
||||
my $self = shift;
|
||||
return( defined($self->{handle})?fileno($self->{handle}):-1 );
|
||||
}
|
||||
|
||||
# Spawning selectable channels
|
||||
package Channel::Spawning;
|
||||
our @ISA = qw(Channel::Handle);
|
||||
|
||||
sub new
|
||||
{
|
||||
my $class = shift;
|
||||
my $port = shift;
|
||||
my $self = Channel::Handle->new();
|
||||
$self->{spawns} = !undef;
|
||||
bless( $self, $class );
|
||||
return $self;
|
||||
}
|
||||
|
||||
# Inet TCP socket channel
|
||||
package Channel::Inet;
|
||||
our @ISA = qw(Channel::Spawning);
|
||||
|
||||
use Socket;
|
||||
|
||||
sub new
|
||||
{
|
||||
my $class = shift;
|
||||
my %params = @_;
|
||||
my $self = Channel::Spawning->new();
|
||||
$self->{selectable} = !undef;
|
||||
$self->{port} = $params{port};
|
||||
bless( $self, $class );
|
||||
return $self;
|
||||
}
|
||||
|
||||
sub open()
|
||||
{
|
||||
my $self = shift;
|
||||
local *sfh;
|
||||
my $saddr = sockaddr_in( $self->{port}, INADDR_ANY );
|
||||
socket( *sfh, PF_INET, SOCK_STREAM, getprotobyname('tcp') ) or die( "Can't open socket: $!" );
|
||||
setsockopt( *sfh, SOL_SOCKET, SO_REUSEADDR, 1 );
|
||||
bind( *sfh, $saddr ) or die( "Can't bind: $!" );
|
||||
listen( *sfh, SOMAXCONN ) or die( "Can't listen: $!" );
|
||||
$self->{state} = 'open';
|
||||
$self->{handle} = *sfh;
|
||||
}
|
||||
|
||||
sub _spawn( $ )
|
||||
{
|
||||
my $self = shift;
|
||||
my $new_handle = shift;
|
||||
my $clone = $self->clone();
|
||||
$clone->{handle} = $new_handle;
|
||||
$clone->{state} = 'connected';
|
||||
return( $clone );
|
||||
}
|
||||
|
||||
sub accept()
|
||||
{
|
||||
my $self = shift;
|
||||
local *cfh;
|
||||
my $paddr = accept( *cfh, $self->{handle} );
|
||||
return( $self->_spawn( *cfh ) );
|
||||
}
|
||||
|
||||
# Unix socket channel
|
||||
package Channel::Unix;
|
||||
our @ISA = qw(Channel::Spawning);
|
||||
|
||||
use Socket;
|
||||
|
||||
sub new
|
||||
{
|
||||
my $class = shift;
|
||||
my %params = @_;
|
||||
my $self = Channel->new;
|
||||
$self->{selectable} = !undef;
|
||||
$self->{path} = $params{path};
|
||||
bless( $self, $class );
|
||||
return $self;
|
||||
}
|
||||
|
||||
sub open()
|
||||
{
|
||||
my $self = shift;
|
||||
local *sfh;
|
||||
unlink( $self->{path} );
|
||||
my $saddr = sockaddr_un( $self->{path} );
|
||||
socket( *sfh, PF_UNIX, SOCK_STREAM, 0 ) or die( "Can't open socket: $!" );
|
||||
bind( *sfh, $saddr ) or die( "Can't bind: $!" );
|
||||
listen( *sfh, SOMAXCONN ) or die( "Can't listen: $!" );
|
||||
$self->{handle} = *sfh;
|
||||
}
|
||||
|
||||
# Simple file channel
|
||||
package Channel::File;
|
||||
our @ISA = qw(Channel::Handle);
|
||||
|
||||
use Fcntl;
|
||||
|
||||
sub new
|
||||
{
|
||||
my $class = shift;
|
||||
my %params = @_;
|
||||
my $self = Channel::Handle->new;
|
||||
$self->{path} = $params{path};
|
||||
bless( $self, $class );
|
||||
return $self;
|
||||
}
|
||||
|
||||
sub open()
|
||||
{
|
||||
my $self = shift;
|
||||
local *sfh;
|
||||
#sysopen( *sfh, $conn->{path}, O_NONBLOCK|O_RDONLY ) or die( "Can't sysopen: $!" );
|
||||
#open( *sfh, "<".$conn->{path} ) or die( "Can't open: $!" );
|
||||
open( *sfh, "+<".$self->{path} ) or die( "Can't open: $!" );
|
||||
$self->{state} = 'open';
|
||||
$self->{handle} = *sfh;
|
||||
}
|
||||
|
||||
# Serial device channel
|
||||
package Channel::Serial;
|
||||
our @ISA = qw(Channel);
|
||||
|
||||
use ZoneMinder::Debug qw(:all);
|
||||
use Device::SerialPort;
|
||||
|
||||
sub new
|
||||
{
|
||||
my $class = shift;
|
||||
my %params = @_;
|
||||
my $self = Channel->new;
|
||||
$self->{path} = $params{path};
|
||||
bless( $self, $class );
|
||||
return $self;
|
||||
}
|
||||
|
||||
sub open()
|
||||
{
|
||||
my $self = shift;
|
||||
my $device = new Device::SerialPort( $self->{path} );
|
||||
$device->baudrate(9600);
|
||||
$device->databits(8);
|
||||
$device->parity('none');
|
||||
$device->stopbits(1);
|
||||
$device->handshake('none');
|
||||
|
||||
$device->read_const_time(50);
|
||||
$device->read_char_time(10);
|
||||
|
||||
$self->{device} = $device;
|
||||
$self->{state} = 'open';
|
||||
$self->{state} = 'connected';
|
||||
}
|
||||
|
||||
sub close()
|
||||
{
|
||||
my $self = shift;
|
||||
$self->{device}->close();
|
||||
$self->{state} = 'closed';
|
||||
}
|
||||
|
||||
sub read()
|
||||
{
|
||||
my $self = shift;
|
||||
my $buffer = $self->{device}->lookfor();
|
||||
if ( !$buffer || !length($buffer) )
|
||||
{
|
||||
return( undef );
|
||||
}
|
||||
Debug( "Read '$buffer' (".length($buffer)." bytes)\n" );
|
||||
return( $buffer );
|
||||
}
|
||||
|
||||
sub write()
|
||||
{
|
||||
my $self = shift;
|
||||
my $buffer = shift;
|
||||
my $nbytes = $self->{device}->write( $buffer );
|
||||
$self->{device}->write_drain();
|
||||
if ( !defined( $nbytes) || $nbytes < length($buffer) )
|
||||
{
|
||||
Error( "Unable to write buffer '".$buffer.", expected ".length($buffer)." bytes, sent ".$nbytes.": $!\n" );
|
||||
return( undef );
|
||||
}
|
||||
Debug( "Wrote '$buffer' ($nbytes bytes)\n" );
|
||||
return( !undef );
|
||||
}
|
||||
|
||||
|
||||
package Connection;
|
||||
use ZoneMinder::Debug;
|
||||
|
||||
our $AUTOLOAD;
|
||||
|
||||
sub new
|
||||
{
|
||||
my $class = shift;
|
||||
my %params = @_;
|
||||
my $self = {};
|
||||
$self->{name} = $params{name};
|
||||
$self->{channel} = $params{channel};
|
||||
$self->{input} = $params{mode} =~ /r/i;
|
||||
$self->{output} = $params{mode} =~ /w/i;
|
||||
bless( $self, $class );
|
||||
return $self;
|
||||
}
|
||||
|
||||
sub clone
|
||||
{
|
||||
my $self = shift;
|
||||
my $clone = { %$self };
|
||||
bless $clone, ref $self;
|
||||
return( $clone );
|
||||
}
|
||||
|
||||
sub _spawn( $ )
|
||||
{
|
||||
my $self = shift;
|
||||
my $new_channel = shift;
|
||||
my $clone = $self->clone();
|
||||
$clone->{channel} = $new_channel;
|
||||
return( $clone );
|
||||
}
|
||||
|
||||
sub accept()
|
||||
{
|
||||
my $self = shift;
|
||||
my $new_channel = $self->{channel}->accept();
|
||||
return( $self->_spawn( $new_channel ) );
|
||||
}
|
||||
|
||||
sub open()
|
||||
{
|
||||
my $self = shift;
|
||||
return( $self->{channel}->open() );
|
||||
}
|
||||
|
||||
sub close()
|
||||
{
|
||||
my $self = shift;
|
||||
return( $self->{channel}->close() );
|
||||
}
|
||||
|
||||
sub fileno()
|
||||
{
|
||||
my $self = shift;
|
||||
return( $self->{channel}->fileno() );
|
||||
}
|
||||
|
||||
sub isOpen()
|
||||
{
|
||||
my $self = shift;
|
||||
return( $self->{channel}->isOpen() );
|
||||
}
|
||||
|
||||
sub isConnected()
|
||||
{
|
||||
my $self = shift;
|
||||
return( $self->{channel}->isConnected() );
|
||||
}
|
||||
|
||||
sub canRead()
|
||||
{
|
||||
my $self = shift;
|
||||
return( $self->{input} && $self->isConnected() );
|
||||
}
|
||||
|
||||
sub canWrite()
|
||||
{
|
||||
my $self = shift;
|
||||
return( $self->{output} && $self->isConnected() );
|
||||
}
|
||||
|
||||
sub getMessages
|
||||
{
|
||||
my $self = shift;
|
||||
my $buffer = $self->{channel}->read();
|
||||
|
||||
return( undef ) if ( !defined($buffer) );
|
||||
|
||||
my @messages = split( /\r?\n/, $buffer );
|
||||
return( \@messages );
|
||||
}
|
||||
|
||||
sub putMessages
|
||||
{
|
||||
my $self = shift;
|
||||
my $messages = shift;
|
||||
|
||||
if ( @$messages )
|
||||
{
|
||||
my $buffer = join( "\n", @$messages );
|
||||
$buffer .= "\n";
|
||||
if ( !$self->{channel}->write( $buffer ) )
|
||||
{
|
||||
Error( "Unable to write buffer '".$buffer." to connection ".$self->{name}." (".$self->fileno().")\n" );
|
||||
}
|
||||
}
|
||||
return( undef );
|
||||
}
|
||||
|
||||
sub DESTROY
|
||||
{
|
||||
}
|
||||
|
||||
sub AUTOLOAD
|
||||
{
|
||||
my $self = shift;
|
||||
my $class = ref($self) || die( "$self not object" );
|
||||
my $name = $AUTOLOAD;
|
||||
$name =~ s/.*://;
|
||||
if ( exists($self->{$name}) )
|
||||
{
|
||||
return( $self->{$name} );
|
||||
}
|
||||
elsif ( defined($self->{channel}) )
|
||||
{
|
||||
if ( exists($self->{channel}->{$name}) )
|
||||
{
|
||||
return( $self->{channel}->{$name} );
|
||||
}
|
||||
}
|
||||
die( "Can't access $name member of object of class $class" );
|
||||
}
|
||||
|
||||
package Connection::Special;
|
||||
our @ISA = qw(Connection);
|
||||
|
||||
sub new
|
||||
{
|
||||
my $class = shift;
|
||||
my $path = shift;
|
||||
my $self = Connection->new( @_ );
|
||||
bless( $self, $class );
|
||||
return $self;
|
||||
}
|
||||
|
||||
sub getMessages
|
||||
{
|
||||
my $self = shift;
|
||||
my $buffer = $self->{channel}->read();
|
||||
|
||||
return( undef ) if ( !defined($buffer) );
|
||||
|
||||
Debug( "Handling buffer '$buffer'\n" );
|
||||
my @messages = grep { s/-/|/g; 1; } split( /\r?\n/, $buffer );
|
||||
return( \@messages );
|
||||
}
|
||||
|
||||
sub putMessages
|
||||
{
|
||||
my $self = shift;
|
||||
my $messages = shift;
|
||||
|
||||
if ( @$messages )
|
||||
{
|
||||
my $buffer = join( "\n", grep{ s/\|/-/; 1; } @$messages );
|
||||
$buffer .= "\n";
|
||||
if ( !$self->{channel}->write( $buffer ) )
|
||||
{
|
||||
Error( "Unable to write buffer '".$buffer." to connection ".$self->{name}." (".$self->fileno().")\n" );
|
||||
}
|
||||
}
|
||||
return( undef );
|
||||
}
|
||||
|
||||
package main;
|
||||
|
||||
my @connections;
|
||||
push( @connections, Connection->new( name=>"Chan1", channel=>Channel::Inet->new( port=>6802 ), mode=>"rw" ) );
|
||||
push( @connections, Connection->new( name=>"Chan2", channel=>Channel::Unix->new( path=>'/tmp/test.sock' ), mode=>"rw" ) );
|
||||
#push( @connections, Connection->new( name=>"Chan3", channel=>Channel::File->new( path=>'/tmp/zmtrigger.out' ), mode=>"w" ) );
|
||||
push( @connections, Connection->new( name=>"Chan4", channel=>Channel::Serial->new( path=>'/dev/ttyS0' ), mode=>"rw" ) );
|
||||
|
||||
# ==========================================================================
|
||||
#
|
||||
@@ -112,57 +588,27 @@ my $dbh = DBI->connect( "DBI:mysql:database=".ZM_DB_NAME.";host=".ZM_DB_HOST, ZM
|
||||
$SIG{HUP} = \&status;
|
||||
|
||||
my $base_rin = '';
|
||||
foreach my $conn ( @conns )
|
||||
foreach my $connection ( @connections )
|
||||
{
|
||||
Info( "Opening conn '$conn->{name}'\n" );
|
||||
if ( $conn->{type} eq "inet" )
|
||||
{
|
||||
local *sfh;
|
||||
my $saddr = sockaddr_in( $conn->{port}, INADDR_ANY );
|
||||
socket( *sfh, PF_INET, SOCK_STREAM, getprotobyname('tcp') ) or die( "Can't open socket: $!" );
|
||||
setsockopt( *sfh, SOL_SOCKET, SO_REUSEADDR, 1 );
|
||||
bind( *sfh, $saddr ) or die( "Can't bind: $!" );
|
||||
listen( *sfh, SOMAXCONN ) or die( "Can't listen: $!" );
|
||||
$conn->{handle} = *sfh;
|
||||
}
|
||||
elsif ( $conn->{type} eq "unix" )
|
||||
{
|
||||
local *sfh;
|
||||
unlink( $conn->{path} );
|
||||
my $saddr = sockaddr_un( $conn->{path} );
|
||||
socket( *sfh, PF_UNIX, SOCK_STREAM, 0 ) or die( "Can't open socket: $!" );
|
||||
bind( *sfh, $saddr ) or die( "Can't bind: $!" );
|
||||
listen( *sfh, SOMAXCONN ) or die( "Can't listen: $!" );
|
||||
$conn->{handle} = *sfh;
|
||||
}
|
||||
elsif ( $conn->{type} eq "file" )
|
||||
{
|
||||
local *sfh;
|
||||
#sysopen( *sfh, $conn->{path}, O_NONBLOCK|O_RDONLY ) or die( "Can't sysopen: $!" );
|
||||
#open( *sfh, "<".$conn->{path} ) or die( "Can't open: $!" );
|
||||
open( *sfh, "+<".$conn->{path} ) or die( "Can't open: $!" );
|
||||
$conn->{handle} = *sfh;
|
||||
}
|
||||
else
|
||||
{
|
||||
die( "Bogus connection type '$conn->{type}' found for '$conn->{name}'" );
|
||||
}
|
||||
Info( "Opening connection '$connection->{name}'\n" );
|
||||
$connection->open();
|
||||
}
|
||||
|
||||
my @in_conns = grep { $_->{in} } @conns;
|
||||
my @out_conns = grep { $_->{out} } @conns;
|
||||
my @in_select_connections = grep { $_->input() && $_->selectable() } @connections;
|
||||
my @in_poll_connections = grep { $_->input() && !$_->selectable() } @connections;
|
||||
my @out_connections = grep { $_->output() } @connections;
|
||||
|
||||
foreach my $conn ( @in_conns )
|
||||
foreach my $connection ( @in_select_connections )
|
||||
{
|
||||
vec( $base_rin, fileno($conn->{handle}), 1 ) = 1;
|
||||
$conn->{iohandle} = $conn->{handle} if ( $conn->{type} eq "file" );
|
||||
print( "FN:".$connection->fileno()."\n" );
|
||||
vec( $base_rin, $connection->fileno(), 1 ) = 1;
|
||||
}
|
||||
|
||||
my $sigset = POSIX::SigSet->new;
|
||||
my $blockset = POSIX::SigSet->new( SIGCHLD );
|
||||
sigprocmask( SIG_BLOCK, $blockset, $sigset ) or die( "Can't block SIGCHLD: $!" );
|
||||
#my $sigset = POSIX::SigSet->new;
|
||||
#my $blockset = POSIX::SigSet->new( SIGCHLD );
|
||||
#sigprocmask( SIG_BLOCK, $blockset, $sigset ) or die( "Can't block SIGCHLD: $!" );
|
||||
|
||||
my %conns;
|
||||
my %spawned_connections;
|
||||
my %monitors;
|
||||
|
||||
my $monitor_reload_time = 0;
|
||||
@@ -171,66 +617,63 @@ $! = undef;
|
||||
my $rin = '';
|
||||
my $win = $rin;
|
||||
my $ein = $win;
|
||||
my $timeout = 0.25;
|
||||
my $timeout = SELECT_TIMEOUT;
|
||||
my %actions;
|
||||
while( 1 )
|
||||
{
|
||||
$rin = $base_rin;
|
||||
foreach my $fileno ( keys(%conns) )
|
||||
# Add the file descriptors of any spawned connections
|
||||
foreach my $fileno ( keys(%spawned_connections) )
|
||||
{
|
||||
vec( $rin, $fileno,1) = 1;
|
||||
vec( $rin, $fileno, 1 ) = 1;
|
||||
}
|
||||
|
||||
my $nfound = select( my $rout = $rin, undef, my $eout = $ein, $timeout );
|
||||
if ( $nfound > 0 )
|
||||
{
|
||||
Debug( "Got input from $nfound connections\n" );
|
||||
foreach my $conn ( @in_conns )
|
||||
foreach my $connection ( @in_select_connections )
|
||||
{
|
||||
if ( vec( $rout, fileno($conn->{handle}),1) )
|
||||
if ( vec( $rout, $connection->fileno(), 1 ) )
|
||||
{
|
||||
Debug( "Got input from connection $conn->{name} (".fileno($conn->{handle}).")\n" );
|
||||
if ( $conn->{type} eq "inet" || $conn->{type} eq "unix" )
|
||||
Debug( "Got input from connection ".$connection->name()." (".$connection->fileno().")\n" );
|
||||
if ( $connection->spawns() )
|
||||
{
|
||||
local *cfh;
|
||||
my $paddr = accept( *cfh, $conn->{handle} );
|
||||
$conn->{iohandle} = *cfh;
|
||||
$conns{fileno(*cfh)} = $conn;
|
||||
Debug( "Added new connection (".fileno(*cfh)."), ".int(keys(%conns))." conns\n" );
|
||||
my $new_connection = $connection->accept();
|
||||
$spawned_connections{$new_connection->fileno()} = $new_connection;
|
||||
Debug( "Added new spawned connection (".$new_connection->fileno()."), ".int(keys(%spawned_connections))." spawned connections\n" );
|
||||
}
|
||||
else
|
||||
{
|
||||
my $buffer;
|
||||
my $nbytes = sysread( $conn->{iohandle}, $buffer, POSIX::BUFSIZ );
|
||||
if ( !$nbytes )
|
||||
my $messages = $connection->getMessages();
|
||||
if ( defined($messages) )
|
||||
{
|
||||
die( "Got unexpected close on connection $conn->{name}" );
|
||||
}
|
||||
else
|
||||
{
|
||||
Debug( "Got '$buffer' ($nbytes bytes)\n" );
|
||||
handleMessages( $conn, $buffer );
|
||||
foreach my $message ( @$messages )
|
||||
{
|
||||
handleMessage( $connection, $message );
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
foreach my $conn ( values(%conns) )
|
||||
foreach my $connection ( values(%spawned_connections) )
|
||||
{
|
||||
if ( vec( $rout, fileno($conn->{iohandle}),1) )
|
||||
if ( vec( $rout, $connection->fileno(), 1 ) )
|
||||
{
|
||||
Debug( "Got input from connection on ".$conn->{name}." (".fileno($conn->{iohandle}).")\n" );
|
||||
my $buffer;
|
||||
my $nbytes = sysread( $conn->{iohandle}, $buffer, POSIX::BUFSIZ );
|
||||
if ( !$nbytes )
|
||||
Debug( "Got input from spawned connection ".$connection->name()." (".$connection->fileno().")\n" );
|
||||
my $messages = $connection->getMessages();
|
||||
if ( defined($messages) )
|
||||
{
|
||||
delete( $conns{fileno($conn->{iohandle})} );
|
||||
Debug( "Removed connection (".fileno($conn->{iohandle})."), ".int(keys(%conns))." conns\n" );
|
||||
close( $conn->{iohandle} );
|
||||
$conn->{iohandle} = undef;
|
||||
foreach my $message ( @$messages )
|
||||
{
|
||||
handleMessage( $connection, $message );
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
Debug( "Got '$buffer' ($nbytes bytes)\n" );
|
||||
handleMessages( $conn, $buffer );
|
||||
delete( $spawned_connections{$connection->fileno()} );
|
||||
Debug( "Removed spawned connection (".$connection->fileno()."), ".int(keys(%spawned_connections))." spawned connections\n" );
|
||||
$connection->close();
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -239,71 +682,92 @@ while( 1 )
|
||||
{
|
||||
if ( $! == EINTR )
|
||||
{
|
||||
# No comment
|
||||
# Do nothing
|
||||
}
|
||||
else
|
||||
{
|
||||
die( "Can't select: $!" );
|
||||
}
|
||||
}
|
||||
else
|
||||
|
||||
# Check polled connections
|
||||
foreach my $connection ( @in_poll_connections )
|
||||
{
|
||||
if ( (time() - $monitor_reload_time) > MONITOR_RELOAD_INTERVAL )
|
||||
my $messages = $connection->getMessages();
|
||||
if ( defined($messages) )
|
||||
{
|
||||
loadMonitors();
|
||||
}
|
||||
|
||||
# Check for alarms that might have happened
|
||||
foreach my $monitor ( values(%monitors) )
|
||||
{
|
||||
my ( $state, $last_event ) = zmShmRead( $monitor, [ "shared_data:state", "shared_data:last_event" ] );
|
||||
|
||||
my $message = undef;
|
||||
if ( $state == STATE_ALARM || $state == STATE_ALERT ) # In alarm state
|
||||
foreach my $message ( @$messages )
|
||||
{
|
||||
if ( !defined($monitor->{LastEvent}) || ($last_event != $monitor->{LastEvent}) ) # A new event
|
||||
{
|
||||
$message = $monitor->{Id}."|on|".time()."|".$last_event;
|
||||
}
|
||||
else # The same one as last time, so ignore it
|
||||
{
|
||||
# Do nothing
|
||||
}
|
||||
}
|
||||
elsif ( $state == STATE_IDLE && $monitor->{LastState} > STATE_IDLE ) # Out of alarm state
|
||||
{
|
||||
$message = $monitor->{Id}."|off|".time()."|".$last_event;
|
||||
}
|
||||
elsif ( defined($monitor->{LastEvent}) && ($last_event != $monitor->{LastEvent}) ) # We've missed a whole event
|
||||
{
|
||||
$message = $monitor->{Id}."|on|".time()."|".$last_event;
|
||||
$message .= "\n";
|
||||
$message .= $monitor->{Id}."|off|".time()."|".$last_event;
|
||||
}
|
||||
$monitor->{LastState} = $state;
|
||||
$monitor->{LastEvent} = $last_event;
|
||||
|
||||
if ( $message )
|
||||
{
|
||||
foreach my $conn ( @out_conns )
|
||||
{
|
||||
sendMessages( $conn, $message ) if ( defined($conn->{iohandle}) );
|
||||
}
|
||||
handleMessage( $connection, $message );
|
||||
}
|
||||
}
|
||||
|
||||
Debug( "Checking for timed actions\n" ) if ( int(keys(%actions)) );
|
||||
my $now = time();
|
||||
foreach my $action_time ( sort( grep { $_ < $now } keys( %actions ) ) )
|
||||
}
|
||||
|
||||
# Check for alarms that might have happened
|
||||
my @out_messages;
|
||||
foreach my $monitor ( values(%monitors) )
|
||||
{
|
||||
my ( $state, $last_event ) = zmShmRead( $monitor, [ "shared_data:state", "shared_data:last_event" ] );
|
||||
|
||||
#print( "$monitor->{Id}: S:$state, LE:$last_event\n" );
|
||||
#print( "$monitor->{Id}: mS:$monitor->{LastState}, mLE:$monitor->{LastEvent}\n" );
|
||||
if ( $state == STATE_ALARM || $state == STATE_ALERT ) # In alarm state
|
||||
{
|
||||
Info( "Found actions expiring at $action_time\n" );
|
||||
foreach my $action ( @{$actions{$action_time}} )
|
||||
if ( !defined($monitor->{LastEvent}) || ($last_event != $monitor->{LastEvent}) ) # A new event
|
||||
{
|
||||
Info( "Found action '$action'\n" );
|
||||
handleMessage( $action );
|
||||
push( @out_messages, $monitor->{Id}."|on|".time()."|".$last_event );
|
||||
}
|
||||
else # The same one as last time, so ignore it
|
||||
{
|
||||
# Do nothing
|
||||
}
|
||||
delete( $actions{$action_time} );
|
||||
}
|
||||
elsif ( $state == STATE_IDLE && $monitor->{LastState} > STATE_IDLE ) # Out of alarm state
|
||||
{
|
||||
push( @out_messages, $monitor->{Id}."|off|".time()."|".$last_event );
|
||||
}
|
||||
elsif ( defined($monitor->{LastEvent}) && ($last_event != $monitor->{LastEvent}) ) # We've missed a whole event
|
||||
{
|
||||
push( @out_messages, $monitor->{Id}."|on|".time()."|".$last_event );
|
||||
push( @out_messages, $monitor->{Id}."|off|".time()."|".$last_event );
|
||||
}
|
||||
$monitor->{LastState} = $state;
|
||||
$monitor->{LastEvent} = $last_event;
|
||||
}
|
||||
foreach my $connection ( @out_connections )
|
||||
{
|
||||
if ( $connection->canWrite() )
|
||||
{
|
||||
$connection->putMessages( \@out_messages );
|
||||
}
|
||||
}
|
||||
foreach my $connection ( values(%spawned_connections) )
|
||||
{
|
||||
if ( $connection->canWrite() )
|
||||
{
|
||||
$connection->putMessages( \@out_messages );
|
||||
}
|
||||
}
|
||||
|
||||
Debug( "Checking for timed actions\n" ) if ( int(keys(%actions)) );
|
||||
my $now = time();
|
||||
foreach my $action_time ( sort( grep { $_ < $now } keys( %actions ) ) )
|
||||
{
|
||||
Info( "Found actions expiring at $action_time\n" );
|
||||
foreach my $action ( @{$actions{$action_time}} )
|
||||
{
|
||||
my $connection = $action->{connection};
|
||||
my $message = $action->{message};
|
||||
Info( "Found action '$message'\n" );
|
||||
handleMessage( $connection, $message );
|
||||
}
|
||||
delete( $actions{$action_time} );
|
||||
}
|
||||
|
||||
# If necessary reload monitors
|
||||
if ( (time() - $monitor_reload_time) > MONITOR_RELOAD_INTERVAL )
|
||||
{
|
||||
loadMonitors();
|
||||
}
|
||||
}
|
||||
Info( "Trigger daemon exiting\n" );
|
||||
@@ -312,6 +776,8 @@ exit;
|
||||
sub loadMonitors
|
||||
{
|
||||
Debug( "Loading monitors\n" );
|
||||
$monitor_reload_time = time();
|
||||
|
||||
my %new_monitors = ();
|
||||
|
||||
my $sql = "select * from Monitors where find_in_set( Function, 'Modect,Mocord,Nodect' )";
|
||||
@@ -340,70 +806,12 @@ sub loadMonitors
|
||||
$new_monitors{$monitor->{Id}} = $monitor;
|
||||
}
|
||||
%monitors = %new_monitors;
|
||||
$monitor_reload_time = time();
|
||||
}
|
||||
|
||||
sub sendMessages
|
||||
{
|
||||
my $conn = shift;
|
||||
my $buffer = shift;
|
||||
#chomp( $buffer );
|
||||
|
||||
Debug( "Sending buffer '$buffer'\n" );
|
||||
foreach my $message ( split( /\r?\n/, $buffer ) )
|
||||
{
|
||||
next if ( !$message );
|
||||
Debug( "Sending message '$message'\n" );
|
||||
my $osub = $conn->{osub};
|
||||
if ( defined($osub) )
|
||||
{
|
||||
$message = &$osub( $message );
|
||||
Debug( "Converted message '$message'\n" );
|
||||
}
|
||||
sendMessage( $conn, $message );
|
||||
}
|
||||
}
|
||||
|
||||
sub sendMessage
|
||||
{
|
||||
my $conn = shift;
|
||||
my $message = shift;
|
||||
|
||||
$message .= "\n";
|
||||
my $nbytes = syswrite( $conn->{iohandle}, $message );
|
||||
if ( !defined( $nbytes) || $nbytes < length($message) )
|
||||
{
|
||||
Error( "Unable to write message '".$message." to connection $conn->{name} (".fileno($conn->{handle})."), expected ".length($message)." bytes, got ".$nbytes.": $!\n" );
|
||||
}
|
||||
}
|
||||
|
||||
sub handleMessages
|
||||
{
|
||||
my $conn = shift;
|
||||
my $buffer = shift;
|
||||
#chomp( $buffer );
|
||||
|
||||
Debug( "Handling buffer '$buffer'\n" );
|
||||
foreach my $message ( split( /\r?\n/, $buffer ) )
|
||||
{
|
||||
next if ( !$message );
|
||||
Debug( "Handling message '$message'\n" );
|
||||
print( Dumper( $conn ) );
|
||||
my $isub = $conn->{isub};
|
||||
print( Dumper( $isub ) );
|
||||
if ( defined($isub) )
|
||||
{
|
||||
$message = &$isub( $message );
|
||||
Debug( "Converted message '$message'\n" );
|
||||
}
|
||||
handleMessage( $message );
|
||||
}
|
||||
}
|
||||
|
||||
sub handleMessage
|
||||
{
|
||||
my $connection = shift;
|
||||
my $message = shift;
|
||||
#chomp( $buffer );
|
||||
|
||||
my ( $id, $action, $score, $cause, $text, $showtext ) = split( /\|/, $message );
|
||||
$score = 0 if ( !defined($score) );
|
||||
@@ -445,7 +853,7 @@ sub handleMessage
|
||||
{
|
||||
$action_array = $actions{$action_time} = [];
|
||||
}
|
||||
push( @$action_array, $action_text );
|
||||
push( @$action_array, { connection=>$connection, message=>$action_text } );
|
||||
Debug( "Added timed event '$action_text', expires at $action_time (+$delay secs)\n" );
|
||||
}
|
||||
}
|
||||
@@ -469,13 +877,14 @@ sub handleMessage
|
||||
if ( $delay )
|
||||
{
|
||||
my $action_time = time()+$delay;
|
||||
my $action_text = $id."|cancel|0|".$cause."|".$text;
|
||||
#my $action_text = $id."|cancel|0|".$cause."|".$text;
|
||||
my $action_text = $id."|cancel";
|
||||
my $action_array = $actions{$action_time};
|
||||
if ( !$action_array )
|
||||
{
|
||||
$action_array = $actions{$action_time} = [];
|
||||
}
|
||||
push( @$action_array, $action_text );
|
||||
push( @$action_array, { connection=>$connection, message=>$action_text } );
|
||||
Debug( "Added timed event '$action_text', expires at $action_time (+$delay secs)\n" );
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user