diff --git a/scripts/zmtrigger.pl b/scripts/zmtrigger.pl index c0a65b74f..afc32d811 100644 --- a/scripts/zmtrigger.pl +++ b/scripts/zmtrigger.pl @@ -38,41 +38,517 @@ use constant DBG_LEVEL => 1; # 0 is errors, warnings and info only, > 0 for debu use constant MAX_CONNECT_DELAY => 10; use constant MONITOR_RELOAD_INTERVAL => 300; +use constant SELECT_TIMEOUT => 0.25; -# Now define the trigger connections, can be inet socket, unix socket or file based -# The isub field should point to a subroutine to convert input messages if necessary -# The osub field should point to a subroutine to convert output messages if necessary +# +# Define classes for any channels that triggers may go in and/or out over +# -my @conns = ( +# Base channel class +package Channel; + +use ZoneMinder::Debug; + +our $AUTOLOAD; + +sub new +{ + my $class = shift; + my $self = {}; + $self->{readable} = !undef; + $self->{writeable} = !undef; + $self->{selectable} = undef; + $self->{state} = 'closed'; + bless( $self, $class ); + return $self; +} + +sub clone +{ + my $self = shift; + my $clone = { %$self }; + bless $clone, ref $self; +} + +sub open() +{ + my $self = shift; + my $class = ref($self) or die( "Can't get class for non object $self" ); + die( "Abstract base class method called for object of class $class" ); +} + +sub close() +{ + my $self = shift; + my $class = ref($self) or die( "Can't get class for non object $self" ); + die( "Abstract base class method called for object of class $class" ); +} + +sub getState() +{ + my $self = shift; + return( $self->{state} ); +} + +sub isOpen() +{ + my $self = shift; + return( $self->{state} eq "open" ); +} + +sub isConnected() +{ + my $self = shift; + return( $self->{state} eq "connected" ); +} + +sub DESTROY +{ +} + +sub AUTOLOAD +{ + my $self = shift; + my $class = ref($self) || die( "$self not object" ); + my $name = $AUTOLOAD; + $name =~ s/.*://; + if ( !exists($self->{$name}) ) { - name => "S1", - type => "inet", - port => "6802", - in => !undef, - out => !undef, - isub => sub { $_[0] =~ s/-/|/g; return( $_[0] ); }, - osub => sub { $_[0] =~ s/\|/-/g; return( $_[0] ); }, - ids => [ 1 ], - }, + die( "Can't access $name member of object of class $class" ); + } + return( $self->{$name} ); +} + +# Handle based channel +package Channel::Handle; +our @ISA = qw(Channel); + +use ZoneMinder::Debug qw(:all); +use POSIX; + +sub new +{ + my $class = shift; + my $port = shift; + my $self = Channel->new(); + $self->{handle} = undef; + bless( $self, $class ); + return $self; +} + +sub close() +{ + my $self = shift; + close( $self->{handle} ); + $self->{state} = 'closed'; + $self->{handle} = undef; +} + +sub read() +{ + my $self = shift; + my $buffer; + my $nbytes = sysread( $self->{handle}, $buffer, POSIX::BUFSIZ ); + if ( !$nbytes ) { - name => "S2", - type => "unix", - path => "/tmp/test.sock", - in => !undef, - out => !undef, - isub => undef, - osub => undef, - }, + return( undef ); + } + Debug( "Read '$buffer' ($nbytes bytes)\n" ); + return( $buffer ); +} + +sub write() +{ + my $self = shift; + my $buffer = shift; + my $nbytes = syswrite( $self->{handle}, $buffer ); + if ( !defined( $nbytes) || $nbytes < length($buffer) ) { - name => "S3", - type => "file", - path => "/dev/ttyS0", - in => undef, - out => undef, - isub => undef, - osub => undef, - }, -); + Error( "Unable to write buffer '".$buffer.", expected ".length($buffer)." bytes, sent ".$nbytes.": $!\n" ); + return( undef ); + } + Debug( "Wrote '$buffer' ($nbytes bytes)\n" ); + return( !undef ); +} + +sub fileno() +{ + my $self = shift; + return( defined($self->{handle})?fileno($self->{handle}):-1 ); +} + +# Spawning selectable channels +package Channel::Spawning; +our @ISA = qw(Channel::Handle); + +sub new +{ + my $class = shift; + my $port = shift; + my $self = Channel::Handle->new(); + $self->{spawns} = !undef; + bless( $self, $class ); + return $self; +} + +# Inet TCP socket channel +package Channel::Inet; +our @ISA = qw(Channel::Spawning); + +use Socket; + +sub new +{ + my $class = shift; + my %params = @_; + my $self = Channel::Spawning->new(); + $self->{selectable} = !undef; + $self->{port} = $params{port}; + bless( $self, $class ); + return $self; +} + +sub open() +{ + my $self = shift; + local *sfh; + my $saddr = sockaddr_in( $self->{port}, INADDR_ANY ); + socket( *sfh, PF_INET, SOCK_STREAM, getprotobyname('tcp') ) or die( "Can't open socket: $!" ); + setsockopt( *sfh, SOL_SOCKET, SO_REUSEADDR, 1 ); + bind( *sfh, $saddr ) or die( "Can't bind: $!" ); + listen( *sfh, SOMAXCONN ) or die( "Can't listen: $!" ); + $self->{state} = 'open'; + $self->{handle} = *sfh; +} + +sub _spawn( $ ) +{ + my $self = shift; + my $new_handle = shift; + my $clone = $self->clone(); + $clone->{handle} = $new_handle; + $clone->{state} = 'connected'; + return( $clone ); +} + +sub accept() +{ + my $self = shift; + local *cfh; + my $paddr = accept( *cfh, $self->{handle} ); + return( $self->_spawn( *cfh ) ); +} + +# Unix socket channel +package Channel::Unix; +our @ISA = qw(Channel::Spawning); + +use Socket; + +sub new +{ + my $class = shift; + my %params = @_; + my $self = Channel->new; + $self->{selectable} = !undef; + $self->{path} = $params{path}; + bless( $self, $class ); + return $self; +} + +sub open() +{ + my $self = shift; + local *sfh; + unlink( $self->{path} ); + my $saddr = sockaddr_un( $self->{path} ); + socket( *sfh, PF_UNIX, SOCK_STREAM, 0 ) or die( "Can't open socket: $!" ); + bind( *sfh, $saddr ) or die( "Can't bind: $!" ); + listen( *sfh, SOMAXCONN ) or die( "Can't listen: $!" ); + $self->{handle} = *sfh; +} + +# Simple file channel +package Channel::File; +our @ISA = qw(Channel::Handle); + +use Fcntl; + +sub new +{ + my $class = shift; + my %params = @_; + my $self = Channel::Handle->new; + $self->{path} = $params{path}; + bless( $self, $class ); + return $self; +} + +sub open() +{ + my $self = shift; + local *sfh; + #sysopen( *sfh, $conn->{path}, O_NONBLOCK|O_RDONLY ) or die( "Can't sysopen: $!" ); + #open( *sfh, "<".$conn->{path} ) or die( "Can't open: $!" ); + open( *sfh, "+<".$self->{path} ) or die( "Can't open: $!" ); + $self->{state} = 'open'; + $self->{handle} = *sfh; +} + +# Serial device channel +package Channel::Serial; +our @ISA = qw(Channel); + +use ZoneMinder::Debug qw(:all); +use Device::SerialPort; + +sub new +{ + my $class = shift; + my %params = @_; + my $self = Channel->new; + $self->{path} = $params{path}; + bless( $self, $class ); + return $self; +} + +sub open() +{ + my $self = shift; + my $device = new Device::SerialPort( $self->{path} ); + $device->baudrate(9600); + $device->databits(8); + $device->parity('none'); + $device->stopbits(1); + $device->handshake('none'); + + $device->read_const_time(50); + $device->read_char_time(10); + + $self->{device} = $device; + $self->{state} = 'open'; + $self->{state} = 'connected'; +} + +sub close() +{ + my $self = shift; + $self->{device}->close(); + $self->{state} = 'closed'; +} + +sub read() +{ + my $self = shift; + my $buffer = $self->{device}->lookfor(); + if ( !$buffer || !length($buffer) ) + { + return( undef ); + } + Debug( "Read '$buffer' (".length($buffer)." bytes)\n" ); + return( $buffer ); +} + +sub write() +{ + my $self = shift; + my $buffer = shift; + my $nbytes = $self->{device}->write( $buffer ); + $self->{device}->write_drain(); + if ( !defined( $nbytes) || $nbytes < length($buffer) ) + { + Error( "Unable to write buffer '".$buffer.", expected ".length($buffer)." bytes, sent ".$nbytes.": $!\n" ); + return( undef ); + } + Debug( "Wrote '$buffer' ($nbytes bytes)\n" ); + return( !undef ); +} + + +package Connection; +use ZoneMinder::Debug; + +our $AUTOLOAD; + +sub new +{ + my $class = shift; + my %params = @_; + my $self = {}; + $self->{name} = $params{name}; + $self->{channel} = $params{channel}; + $self->{input} = $params{mode} =~ /r/i; + $self->{output} = $params{mode} =~ /w/i; + bless( $self, $class ); + return $self; +} + +sub clone +{ + my $self = shift; + my $clone = { %$self }; + bless $clone, ref $self; + return( $clone ); +} + +sub _spawn( $ ) +{ + my $self = shift; + my $new_channel = shift; + my $clone = $self->clone(); + $clone->{channel} = $new_channel; + return( $clone ); +} + +sub accept() +{ + my $self = shift; + my $new_channel = $self->{channel}->accept(); + return( $self->_spawn( $new_channel ) ); +} + +sub open() +{ + my $self = shift; + return( $self->{channel}->open() ); +} + +sub close() +{ + my $self = shift; + return( $self->{channel}->close() ); +} + +sub fileno() +{ + my $self = shift; + return( $self->{channel}->fileno() ); +} + +sub isOpen() +{ + my $self = shift; + return( $self->{channel}->isOpen() ); +} + +sub isConnected() +{ + my $self = shift; + return( $self->{channel}->isConnected() ); +} + +sub canRead() +{ + my $self = shift; + return( $self->{input} && $self->isConnected() ); +} + +sub canWrite() +{ + my $self = shift; + return( $self->{output} && $self->isConnected() ); +} + +sub getMessages +{ + my $self = shift; + my $buffer = $self->{channel}->read(); + + return( undef ) if ( !defined($buffer) ); + + my @messages = split( /\r?\n/, $buffer ); + return( \@messages ); +} + +sub putMessages +{ + my $self = shift; + my $messages = shift; + + if ( @$messages ) + { + my $buffer = join( "\n", @$messages ); + $buffer .= "\n"; + if ( !$self->{channel}->write( $buffer ) ) + { + Error( "Unable to write buffer '".$buffer." to connection ".$self->{name}." (".$self->fileno().")\n" ); + } + } + return( undef ); +} + +sub DESTROY +{ +} + +sub AUTOLOAD +{ + my $self = shift; + my $class = ref($self) || die( "$self not object" ); + my $name = $AUTOLOAD; + $name =~ s/.*://; + if ( exists($self->{$name}) ) + { + return( $self->{$name} ); + } + elsif ( defined($self->{channel}) ) + { + if ( exists($self->{channel}->{$name}) ) + { + return( $self->{channel}->{$name} ); + } + } + die( "Can't access $name member of object of class $class" ); +} + +package Connection::Special; +our @ISA = qw(Connection); + +sub new +{ + my $class = shift; + my $path = shift; + my $self = Connection->new( @_ ); + bless( $self, $class ); + return $self; +} + +sub getMessages +{ + my $self = shift; + my $buffer = $self->{channel}->read(); + + return( undef ) if ( !defined($buffer) ); + + Debug( "Handling buffer '$buffer'\n" ); + my @messages = grep { s/-/|/g; 1; } split( /\r?\n/, $buffer ); + return( \@messages ); +} + +sub putMessages +{ + my $self = shift; + my $messages = shift; + + if ( @$messages ) + { + my $buffer = join( "\n", grep{ s/\|/-/; 1; } @$messages ); + $buffer .= "\n"; + if ( !$self->{channel}->write( $buffer ) ) + { + Error( "Unable to write buffer '".$buffer." to connection ".$self->{name}." (".$self->fileno().")\n" ); + } + } + return( undef ); +} + +package main; + +my @connections; +push( @connections, Connection->new( name=>"Chan1", channel=>Channel::Inet->new( port=>6802 ), mode=>"rw" ) ); +push( @connections, Connection->new( name=>"Chan2", channel=>Channel::Unix->new( path=>'/tmp/test.sock' ), mode=>"rw" ) ); +#push( @connections, Connection->new( name=>"Chan3", channel=>Channel::File->new( path=>'/tmp/zmtrigger.out' ), mode=>"w" ) ); +push( @connections, Connection->new( name=>"Chan4", channel=>Channel::Serial->new( path=>'/dev/ttyS0' ), mode=>"rw" ) ); # ========================================================================== # @@ -112,57 +588,27 @@ my $dbh = DBI->connect( "DBI:mysql:database=".ZM_DB_NAME.";host=".ZM_DB_HOST, ZM $SIG{HUP} = \&status; my $base_rin = ''; -foreach my $conn ( @conns ) +foreach my $connection ( @connections ) { - Info( "Opening conn '$conn->{name}'\n" ); - if ( $conn->{type} eq "inet" ) - { - local *sfh; - my $saddr = sockaddr_in( $conn->{port}, INADDR_ANY ); - socket( *sfh, PF_INET, SOCK_STREAM, getprotobyname('tcp') ) or die( "Can't open socket: $!" ); - setsockopt( *sfh, SOL_SOCKET, SO_REUSEADDR, 1 ); - bind( *sfh, $saddr ) or die( "Can't bind: $!" ); - listen( *sfh, SOMAXCONN ) or die( "Can't listen: $!" ); - $conn->{handle} = *sfh; - } - elsif ( $conn->{type} eq "unix" ) - { - local *sfh; - unlink( $conn->{path} ); - my $saddr = sockaddr_un( $conn->{path} ); - socket( *sfh, PF_UNIX, SOCK_STREAM, 0 ) or die( "Can't open socket: $!" ); - bind( *sfh, $saddr ) or die( "Can't bind: $!" ); - listen( *sfh, SOMAXCONN ) or die( "Can't listen: $!" ); - $conn->{handle} = *sfh; - } - elsif ( $conn->{type} eq "file" ) - { - local *sfh; - #sysopen( *sfh, $conn->{path}, O_NONBLOCK|O_RDONLY ) or die( "Can't sysopen: $!" ); - #open( *sfh, "<".$conn->{path} ) or die( "Can't open: $!" ); - open( *sfh, "+<".$conn->{path} ) or die( "Can't open: $!" ); - $conn->{handle} = *sfh; - } - else - { - die( "Bogus connection type '$conn->{type}' found for '$conn->{name}'" ); - } + Info( "Opening connection '$connection->{name}'\n" ); + $connection->open(); } -my @in_conns = grep { $_->{in} } @conns; -my @out_conns = grep { $_->{out} } @conns; +my @in_select_connections = grep { $_->input() && $_->selectable() } @connections; +my @in_poll_connections = grep { $_->input() && !$_->selectable() } @connections; +my @out_connections = grep { $_->output() } @connections; -foreach my $conn ( @in_conns ) +foreach my $connection ( @in_select_connections ) { - vec( $base_rin, fileno($conn->{handle}), 1 ) = 1; - $conn->{iohandle} = $conn->{handle} if ( $conn->{type} eq "file" ); + print( "FN:".$connection->fileno()."\n" ); + vec( $base_rin, $connection->fileno(), 1 ) = 1; } -my $sigset = POSIX::SigSet->new; -my $blockset = POSIX::SigSet->new( SIGCHLD ); -sigprocmask( SIG_BLOCK, $blockset, $sigset ) or die( "Can't block SIGCHLD: $!" ); +#my $sigset = POSIX::SigSet->new; +#my $blockset = POSIX::SigSet->new( SIGCHLD ); +#sigprocmask( SIG_BLOCK, $blockset, $sigset ) or die( "Can't block SIGCHLD: $!" ); -my %conns; +my %spawned_connections; my %monitors; my $monitor_reload_time = 0; @@ -171,66 +617,63 @@ $! = undef; my $rin = ''; my $win = $rin; my $ein = $win; -my $timeout = 0.25; +my $timeout = SELECT_TIMEOUT; my %actions; while( 1 ) { $rin = $base_rin; - foreach my $fileno ( keys(%conns) ) + # Add the file descriptors of any spawned connections + foreach my $fileno ( keys(%spawned_connections) ) { - vec( $rin, $fileno,1) = 1; + vec( $rin, $fileno, 1 ) = 1; } + my $nfound = select( my $rout = $rin, undef, my $eout = $ein, $timeout ); if ( $nfound > 0 ) { Debug( "Got input from $nfound connections\n" ); - foreach my $conn ( @in_conns ) + foreach my $connection ( @in_select_connections ) { - if ( vec( $rout, fileno($conn->{handle}),1) ) + if ( vec( $rout, $connection->fileno(), 1 ) ) { - Debug( "Got input from connection $conn->{name} (".fileno($conn->{handle}).")\n" ); - if ( $conn->{type} eq "inet" || $conn->{type} eq "unix" ) + Debug( "Got input from connection ".$connection->name()." (".$connection->fileno().")\n" ); + if ( $connection->spawns() ) { - local *cfh; - my $paddr = accept( *cfh, $conn->{handle} ); - $conn->{iohandle} = *cfh; - $conns{fileno(*cfh)} = $conn; - Debug( "Added new connection (".fileno(*cfh)."), ".int(keys(%conns))." conns\n" ); + my $new_connection = $connection->accept(); + $spawned_connections{$new_connection->fileno()} = $new_connection; + Debug( "Added new spawned connection (".$new_connection->fileno()."), ".int(keys(%spawned_connections))." spawned connections\n" ); } else { - my $buffer; - my $nbytes = sysread( $conn->{iohandle}, $buffer, POSIX::BUFSIZ ); - if ( !$nbytes ) + my $messages = $connection->getMessages(); + if ( defined($messages) ) { - die( "Got unexpected close on connection $conn->{name}" ); - } - else - { - Debug( "Got '$buffer' ($nbytes bytes)\n" ); - handleMessages( $conn, $buffer ); + foreach my $message ( @$messages ) + { + handleMessage( $connection, $message ); + } } } } } - foreach my $conn ( values(%conns) ) + foreach my $connection ( values(%spawned_connections) ) { - if ( vec( $rout, fileno($conn->{iohandle}),1) ) + if ( vec( $rout, $connection->fileno(), 1 ) ) { - Debug( "Got input from connection on ".$conn->{name}." (".fileno($conn->{iohandle}).")\n" ); - my $buffer; - my $nbytes = sysread( $conn->{iohandle}, $buffer, POSIX::BUFSIZ ); - if ( !$nbytes ) + Debug( "Got input from spawned connection ".$connection->name()." (".$connection->fileno().")\n" ); + my $messages = $connection->getMessages(); + if ( defined($messages) ) { - delete( $conns{fileno($conn->{iohandle})} ); - Debug( "Removed connection (".fileno($conn->{iohandle})."), ".int(keys(%conns))." conns\n" ); - close( $conn->{iohandle} ); - $conn->{iohandle} = undef; + foreach my $message ( @$messages ) + { + handleMessage( $connection, $message ); + } } else { - Debug( "Got '$buffer' ($nbytes bytes)\n" ); - handleMessages( $conn, $buffer ); + delete( $spawned_connections{$connection->fileno()} ); + Debug( "Removed spawned connection (".$connection->fileno()."), ".int(keys(%spawned_connections))." spawned connections\n" ); + $connection->close(); } } } @@ -239,71 +682,92 @@ while( 1 ) { if ( $! == EINTR ) { - # No comment + # Do nothing } else { die( "Can't select: $!" ); } } - else + + # Check polled connections + foreach my $connection ( @in_poll_connections ) { - if ( (time() - $monitor_reload_time) > MONITOR_RELOAD_INTERVAL ) + my $messages = $connection->getMessages(); + if ( defined($messages) ) { - loadMonitors(); - } - - # Check for alarms that might have happened - foreach my $monitor ( values(%monitors) ) - { - my ( $state, $last_event ) = zmShmRead( $monitor, [ "shared_data:state", "shared_data:last_event" ] ); - - my $message = undef; - if ( $state == STATE_ALARM || $state == STATE_ALERT ) # In alarm state + foreach my $message ( @$messages ) { - if ( !defined($monitor->{LastEvent}) || ($last_event != $monitor->{LastEvent}) ) # A new event - { - $message = $monitor->{Id}."|on|".time()."|".$last_event; - } - else # The same one as last time, so ignore it - { - # Do nothing - } - } - elsif ( $state == STATE_IDLE && $monitor->{LastState} > STATE_IDLE ) # Out of alarm state - { - $message = $monitor->{Id}."|off|".time()."|".$last_event; - } - elsif ( defined($monitor->{LastEvent}) && ($last_event != $monitor->{LastEvent}) ) # We've missed a whole event - { - $message = $monitor->{Id}."|on|".time()."|".$last_event; - $message .= "\n"; - $message .= $monitor->{Id}."|off|".time()."|".$last_event; - } - $monitor->{LastState} = $state; - $monitor->{LastEvent} = $last_event; - - if ( $message ) - { - foreach my $conn ( @out_conns ) - { - sendMessages( $conn, $message ) if ( defined($conn->{iohandle}) ); - } + handleMessage( $connection, $message ); } } - - Debug( "Checking for timed actions\n" ) if ( int(keys(%actions)) ); - my $now = time(); - foreach my $action_time ( sort( grep { $_ < $now } keys( %actions ) ) ) + } + + # Check for alarms that might have happened + my @out_messages; + foreach my $monitor ( values(%monitors) ) + { + my ( $state, $last_event ) = zmShmRead( $monitor, [ "shared_data:state", "shared_data:last_event" ] ); + + #print( "$monitor->{Id}: S:$state, LE:$last_event\n" ); + #print( "$monitor->{Id}: mS:$monitor->{LastState}, mLE:$monitor->{LastEvent}\n" ); + if ( $state == STATE_ALARM || $state == STATE_ALERT ) # In alarm state { - Info( "Found actions expiring at $action_time\n" ); - foreach my $action ( @{$actions{$action_time}} ) + if ( !defined($monitor->{LastEvent}) || ($last_event != $monitor->{LastEvent}) ) # A new event { - Info( "Found action '$action'\n" ); - handleMessage( $action ); + push( @out_messages, $monitor->{Id}."|on|".time()."|".$last_event ); + } + else # The same one as last time, so ignore it + { + # Do nothing } - delete( $actions{$action_time} ); } + elsif ( $state == STATE_IDLE && $monitor->{LastState} > STATE_IDLE ) # Out of alarm state + { + push( @out_messages, $monitor->{Id}."|off|".time()."|".$last_event ); + } + elsif ( defined($monitor->{LastEvent}) && ($last_event != $monitor->{LastEvent}) ) # We've missed a whole event + { + push( @out_messages, $monitor->{Id}."|on|".time()."|".$last_event ); + push( @out_messages, $monitor->{Id}."|off|".time()."|".$last_event ); + } + $monitor->{LastState} = $state; + $monitor->{LastEvent} = $last_event; + } + foreach my $connection ( @out_connections ) + { + if ( $connection->canWrite() ) + { + $connection->putMessages( \@out_messages ); + } + } + foreach my $connection ( values(%spawned_connections) ) + { + if ( $connection->canWrite() ) + { + $connection->putMessages( \@out_messages ); + } + } + + Debug( "Checking for timed actions\n" ) if ( int(keys(%actions)) ); + my $now = time(); + foreach my $action_time ( sort( grep { $_ < $now } keys( %actions ) ) ) + { + Info( "Found actions expiring at $action_time\n" ); + foreach my $action ( @{$actions{$action_time}} ) + { + my $connection = $action->{connection}; + my $message = $action->{message}; + Info( "Found action '$message'\n" ); + handleMessage( $connection, $message ); + } + delete( $actions{$action_time} ); + } + + # If necessary reload monitors + if ( (time() - $monitor_reload_time) > MONITOR_RELOAD_INTERVAL ) + { + loadMonitors(); } } Info( "Trigger daemon exiting\n" ); @@ -312,6 +776,8 @@ exit; sub loadMonitors { Debug( "Loading monitors\n" ); + $monitor_reload_time = time(); + my %new_monitors = (); my $sql = "select * from Monitors where find_in_set( Function, 'Modect,Mocord,Nodect' )"; @@ -340,70 +806,12 @@ sub loadMonitors $new_monitors{$monitor->{Id}} = $monitor; } %monitors = %new_monitors; - $monitor_reload_time = time(); -} - -sub sendMessages -{ - my $conn = shift; - my $buffer = shift; - #chomp( $buffer ); - - Debug( "Sending buffer '$buffer'\n" ); - foreach my $message ( split( /\r?\n/, $buffer ) ) - { - next if ( !$message ); - Debug( "Sending message '$message'\n" ); - my $osub = $conn->{osub}; - if ( defined($osub) ) - { - $message = &$osub( $message ); - Debug( "Converted message '$message'\n" ); - } - sendMessage( $conn, $message ); - } -} - -sub sendMessage -{ - my $conn = shift; - my $message = shift; - - $message .= "\n"; - my $nbytes = syswrite( $conn->{iohandle}, $message ); - if ( !defined( $nbytes) || $nbytes < length($message) ) - { - Error( "Unable to write message '".$message." to connection $conn->{name} (".fileno($conn->{handle})."), expected ".length($message)." bytes, got ".$nbytes.": $!\n" ); - } -} - -sub handleMessages -{ - my $conn = shift; - my $buffer = shift; - #chomp( $buffer ); - - Debug( "Handling buffer '$buffer'\n" ); - foreach my $message ( split( /\r?\n/, $buffer ) ) - { - next if ( !$message ); - Debug( "Handling message '$message'\n" ); - print( Dumper( $conn ) ); - my $isub = $conn->{isub}; - print( Dumper( $isub ) ); - if ( defined($isub) ) - { - $message = &$isub( $message ); - Debug( "Converted message '$message'\n" ); - } - handleMessage( $message ); - } } sub handleMessage { + my $connection = shift; my $message = shift; - #chomp( $buffer ); my ( $id, $action, $score, $cause, $text, $showtext ) = split( /\|/, $message ); $score = 0 if ( !defined($score) ); @@ -445,7 +853,7 @@ sub handleMessage { $action_array = $actions{$action_time} = []; } - push( @$action_array, $action_text ); + push( @$action_array, { connection=>$connection, message=>$action_text } ); Debug( "Added timed event '$action_text', expires at $action_time (+$delay secs)\n" ); } } @@ -469,13 +877,14 @@ sub handleMessage if ( $delay ) { my $action_time = time()+$delay; - my $action_text = $id."|cancel|0|".$cause."|".$text; + #my $action_text = $id."|cancel|0|".$cause."|".$text; + my $action_text = $id."|cancel"; my $action_array = $actions{$action_time}; if ( !$action_array ) { $action_array = $actions{$action_time} = []; } - push( @$action_array, $action_text ); + push( @$action_array, { connection=>$connection, message=>$action_text } ); Debug( "Added timed event '$action_text', expires at $action_time (+$delay secs)\n" ); } }