# ==================================================================
# Gossamer Threads Module Library - http://gossamer-threads.com/
#
#   GT::IPC::Run::Win32
#   Author  : Scott Beck
#   $Id: Win32.pm,v 1.14 2004/01/13 01:35:17 jagerman Exp $
# 
# Copyright (c) 2004 Gossamer Threads Inc.  All Rights Reserved.
# ==================================================================

package GT::IPC::Run::Win32;

use strict;
use vars qw/$EVENTS $ERROR_MESSAGE/;
use base 'GT::Base';

use POSIX qw(fcntl_h errno_h :sys_wait_h);
use GT::Lock qw/lock unlock/;
use Win32;
use Win32::Process;
use Win32::Mutex;
sub READ_BLOCK () { 512 }

# What Win32 module exports this?
sub WSAEWOULDBLOCK () { 10035 }

@GT::IPC::Run::Win32::ISA = qw(GT::IPC::Run);

$ERROR_MESSAGE = 'GT::IPC::Run';

sub execute {
# ------------------------------------------------------------------------
    my ($self) = @_;
    
    my $pid;
    my $child = $self->{current_child};
    if (ref($child->program) eq 'ARRAY' or !ref($child->program)) {
        my $process = $self->fork_exec;
        $child->pid($process->GetProcessID);
        $child->process($process);
    }
    else {
        $child->pid($self->fork_code);
        $child->forked(1);
    }
    $self->{children}{$child->pid} = delete $self->{current_child};
    return $child->pid;
}

sub fork_exec {
# ------------------------------------------------------------------------
# Called on Win32 systems when wanting to exec() a process. This
# Replaceses forking and executing. You can not get filehandle inheritance
# When exec() after a fork, fun stuff.
    my $self = shift;


    my $child = $self->{current_child};
    my $process = '';
    my $program = ref($child->program) eq 'ARRAY'
        ? $child->program
        : [split ' ', $child->program];
    open STDOUT_SAVE, ">&STDOUT";
    open STDERR_SAVE, ">&STDERR";
    open STDIN_SAVE, "<&STDIN";

    # Redirect STDOUT to the write end of the stdout pipe.
    if ($child->stdout_write) {
        my $fn = fileno($child->stdout_write);
        if (defined $fn) {
            $self->debug("Opening stdout to fileno $fn") if $self->{_debug};
            open( STDOUT, ">&$fn" )
                or die "can't redirect stdout in child pid $$: $!";
            $self->debug("stdout opened") if $self->{_debug};
        }
        else {
            die "No fileno for stdout_write";
        }
    }

    # Redirect STDIN from the read end of the stdin pipe.
    if ($child->stdin_read) {
        my $fn = fileno($child->stdin_read);
        if (defined $fn) {
            $self->debug("Opening stdin to fileno $fn") if $self->{_debug};
            open( STDIN, "<&$fn" )
                or die "can't redirect STDIN in child pid $$: $!";
            $self->debug("stdin opened") if $self->{_debug};
        }
        else {
            die "No fileno for stdin_read";
        }
    }

    # Redirect STDERR to the write end of the stderr pipe.
    if ($child->stderr_write) {
        my $fn = fileno($child->stderr_write);
        if (defined $fn) {
            $self->debug("Opening stderr to fileno $fn") if $self->{_debug};
            open( STDERR, ">&$fn" )
                or die "can't redirect stderr in child: $!";
        }
        else {
            die "No fileno for stderr_write";
        }
    }

    select STDOUT;  $| = 1;
    select STDERR;  $| = 1;
    select STDOUT;
    Win32::Process::Create(
        $process,
        $program->[0],
        "@$program",
        1,
        NORMAL_PRIORITY_CLASS,
        '.'
    ) or do {
        open STDOUT, ">&STDOUT_SAVE";
        open STDERR, ">&STDERR_SAVE";
        open STDIN, "<&STDIN_SAVE";
        die "can't exec (@$program) using Win32::Process; Reason: ".
        Win32::FormatMessage(Win32::GetLastError);
    };
    syswrite($child->stdin_write, ${$child->stdin}, length(${$child->stdin}), 0)
        if ref($child->stdin) eq 'SCALAR';
    open STDOUT, ">&STDOUT_SAVE";
    open STDERR, ">&STDERR_SAVE";
    open STDIN, "<&STDIN_SAVE";
    return $process;
}

sub fork_code {
# ------------------------------------------------------------------------
    my $self = shift;

    # Hack to keep from forking too many process too fast, perl on windows
    # tends to segv when that happens
    select undef, undef, undef, 0.5;
    
    # So we know when the child is finished setting up
    my $mutex = new Win32::Mutex(1, 'CHILD');
    my $pid;
    if ($pid = fork) { # Parent
        my $child = $self->{current_child};
        $mutex->wait(2000);
        print {$child->stdin_write} ${$child->stdin}
            if ref($child->stdin) eq 'SCALAR';
        return $pid;
    }
    else {
        $self->fatal( FORK => "$!" ) unless defined $pid;
        $self->debug("Forked: $$\n") if $self->{_debug} > 1;

        # Hack to keep the child from destroying the mutex
        {
            package GT::IPC::Run::Mutex;
            @GT::IPC::Run::Mutex::ISA = 'Win32::Mutex';
            sub DESTROY {}
        }
        bless $mutex, 'GT::IPC::Run::Mutex';

        my $child = $self->{current_child};
        my ($stdout, $stderr, $stdin) = (
            $child->stdout_write,
            $child->stderr_write,
            $child->stdin_read
        );

        # Redirect STDOUT to the write end of the stdout pipe.
        if (defined $stdout) {
            *STDOUT = $stdout;
            $self->debug("stdout opened") if $self->{_debug};
        }

        # Redirect STDIN from the read end of the stdin pipe.
        if (defined $stdin) {
            *STDIN = $stdin;
            $self->debug("stdin opened") if $self->{_debug};
        }

        # Redirect STDERR to the write end of the stderr pipe.
        if (defined $stderr) {
            *STDERR = $stderr;
        }

        select STDERR;  $| = 1;
        select STDOUT;  $| = 1;

        # Tell the parent that the stdio has been set up.
        $mutex->release;

        # Launch the code reference
        $child->program->();
        close STDOUT if defined fileno STDOUT;
        close STDERR if defined fileno STDERR;
        exit(0);
    }
}

sub do_one_loop {
# ------------------------------------------------------------------------
    my ($self, $wait) = @_;
    $wait = 0.05 unless defined $wait;

    $self->check_for_exit;
    $self->debug(
        "Children: ".  keys(%{$self->{children}}).
        "; goners: ".  keys(%{$self->{goners}})
    ) if $self->{_debug};

    for my $pid (keys %{$self->{children}}) {
        my $child = $self->{children}{$pid};

        if ($child->stdout_read) {
            my $ret = sysread($child->stdout_read, my $buff, READ_BLOCK);
            if (!$ret) {
                # Fun stuff with win32
                if ($! == EAGAIN) {
                    # Socket error
                    #$self->{select}->remove_stdout($pid);
                    $self->debug(
                        "1: $pid: Socket Read: Error: $!; OSError: $^E; Errno: ".(0+$!)."; OSErrno: ".(0+$^E)
                    ) if $self->{_debug};
                }
                elsif ($! == WSAEWOULDBLOCK and exists $self->{goners}{$pid}) {
                    $child->{socket_err}++;
                    $self->debug(
                        "2: $pid: Socket Read: Error: $!; OSError: $^E; Errno: ".(0+$!)."; OSErrno: ".(0+$^E)
                    ) if $self->{_debug} > 1;
                }
                else {
                    $child->{socket_err}++;
                    $self->debug(
                        "2: $pid: Socket Read: Error: $!; OSError: $^E; Errno: ".(0+$!)."; OSErrno: ".(0+$^E)
                    ) if $self->{_debug} > 1;
                }
            }
            else {
                # Process callbacks
                $self->debug("[$pid STDOUT]: `$buff'\n") if $self->{_debug} > 1;
                if (defined $child->handler_stdout) {
                    $child->handler_stdout->put(\$buff);
                }
            }
        }
        if ($child->stderr_read) {
            my $ret = sysread($child->stderr_read, my $buff, READ_BLOCK);
            if (!$ret) {
                # Fun stuff with win32
                if ($! == EAGAIN) {
                    # Socket error
                    #$self->{select}->remove_stderr($pid);
                    $self->debug(
                        "1: $pid: Socket Read: Error: $!; OSError: $^E; Errno: ".(0+$!)."; OSErrno: ".(0+$^E)
                    ) if $self->{_debug};
                }
                elsif ($! == WSAEWOULDBLOCK and exists $self->{goners}{$pid}) {
                    $child->{socket_err}++;
                    $self->debug(
                        "2: $pid: Socket Read: Error: $!; OSError: $^E; Errno: ".(0+$!)."; OSErrno: ".(0+$^E)
                    ) if $self->{_debug} > 1;
                }
                else {
                    $child->{socket_err}++;
                    $self->debug(
                        "2: $pid: Socket Read: Error: $!; OSError: $^E; Errno: ".(0+$!)."; OSErrno: ".(0+$^E)
                    ) if $self->{_debug} > 1;
                }
            }
            else {
                # Process callbacks
                $self->debug("[$pid STDERR]: `$buff'\n") if $self->{_debug} > 1;
                if (defined $child->handler_stderr) {
                    $child->handler_stderr->put(\$buff);
                }
            }
        }
    }
    # Call the "done" callback for anything that has exited and has no pending output
    my %not_pending = %{$self->{children}};
    for my $child (values %{$self->{children}}) {
        if ($child->{socket_err} >= 2) {
            delete $not_pending{$child->{pid}};
        }
    }
    for my $pid (keys %{$self->{goners}}) {
        my $child = $self->{children}{$pid} or next;
        if ($not_pending{$pid} and not $child->called_done) {
            $child->done_callback->($pid, $self->{goners}{$pid})
                if $child->done_callback;
            $child->called_done(1);
        }
    }

    my $done;
    for my $child (values %{$self->{children}}) {
        if ($child->{socket_err} >= 2) {
            $done++;
        }
    }
    if ($done == keys %{$self->{children}} and (keys(%{$self->{children}}) <= keys(%{$self->{goners}}))) {
        # We still have children out there
        if (keys(%{$self->{children}}) > keys(%{$self->{goners}})) {
            $self->debug("We still have children") if $self->{_debug};
            return 1;
        }

        $self->debug("Nothing else to do, flushing buffers")
            if $self->{_debug};

        # Flush output filters
        for my $pid (keys %{$self->{children}}) {
            my $child = delete $self->{children}{$pid};
            $self->select->remove_stdout($pid);
            $self->select->remove_stderr($pid);
            if ($child->handler_stdout) {
                $child->handler_stdout->flush;
            }
            if ($child->handler_stderr) {
                $child->handler_stderr->flush;
            }
        }

        # Nothing left to do
        $self->debug("Returning 0") if $self->{_debug};
        return 0;
    }

#    for my $pid (@$stdout_pending) {
#        my $child = $self->{children}{$pid};
#        $self->debug("STDOUT pending for $pid") if $self->{_debug};
#
#        my $ret = sysread($child->stdout_read, my $buff, READ_BLOCK);
#        if (!$ret) {
#            # Fun stuff with win32
#            if ($! != EAGAIN and $! != WSAEWOULDBLOCK) {
#                # Socket error
#                $self->{select}->remove_stdout($pid);
#                $self->debug("1: $pid: Socket Read: Error: $!; OSError: $^E; Errno: ", (0+$!), "; OSErrno: ", (0+$^E))
#                    if $self->{_debug};
#            }
#            else {
#                $self->debug("2: $pid: Socket Read: Error: $!; OSError: $^E; Errno: ", (0+$!), "; OSErrno: ", (0+$^E))
#                    if $self->{_debug};
#            }
#        }
#        else {
#            # Process callbacks
#            $self->debug("[$pid STDOUT]: `$buff'\n") if $self->{_debug} > 1;
#            if (defined $child->handler_stdout) {
#                $child->handler_stdout->put(\$buff);
#            }
#        }
#    }
#
#    for my $pid (@$stderr_pending) {
#        my $child = $self->{children}{$pid};
#        $self->debug("STDERR pending for $pid") if $self->{_debug};
#
#        my $ret = sysread($child->stderr_read, my $buff, READ_BLOCK);
#        if (!$ret) {
#            # Fun stuff with win32
#            if ($! != EAGAIN and $! != WSAEWOULDBLOCK) {
#                # Socket error
#                $self->{select}->remove_stderr($pid);
#                $self->debug("1: $pid: Socket Read: Error: $!; OSError: $^E; Errno: ", (0+$!), "; OSErrno: ", (0+$^E))
#                    if $self->{_debug};
#            }
#            else {
#                $self->debug("2: $pid: Socket Read: Error: $!; OSError: $^E; Errno: ", (0+$!), "; OSErrno: ", (0+$^E))
#                    if $self->{_debug};
#            }
#        }
#        else {
#            # Process callbacks
#            $self->debug("[$pid STDERR]: `$buff'\n") if $self->{_debug} > 1;
#            if (defined $child->handler_stderr) {
#                $child->handler_stderr->put(\$buff);
#            }
#        }
#    }
    return 1;
}

my $warned;
sub check_for_exit {
# ------------------------------------------------------------------------
    my ($self) = @_;
    # This process was created with Win32::Process. The problem is
    # there is no way to reliably get the output from a Win32::Process
    # program in a loop like this. Output handles are not flushed when
    # process exits, which means that if it blocks a little we will
    # likly lose the last output it produces, this is so not nice.
    for my $pid (keys %{$self->{children}}) {
        my $child = $self->{children}{$pid};
        next if exists $self->{goners}{$pid};

        if ($child->forked) {
            # Check if the program exited
            my $got_pid;
            my $waited = waitpid($pid, WNOHANG);
            my $killed = 1;
            $self->debug("waited: $waited; pid: $pid")
                if $self->{_debug};
            if ($waited < -1) {
                $self->{goners}{$pid} = $?;
                $child->exit_callback->($pid, $?)
                    if $child->exit_callback;
                $self->debug(
                    "forked child $pid exited with exit status (".
                    ($self->{goners}{$pid} >> 8).
                    ")\n"
                ) if $self->{_debug};
            }
            elsif ($waited == -1) {
                $self->{goners}{$pid} = 0;
                $child->exit_callback->($pid, 0)
                    if $child->exit_callback;
            }
#            elsif ($waited == -1) {
#                for my $pid (keys %{$self->{children}}) {
#                    $self->{select}->remove_stdout($pid);
#                    $self->{select}->remove_stderr($pid);
#                    unless (exists $self->{goners}{$pid}) {
#                        $self->{goners}{$pid} = -1;
#                        $self->{children}{$pid}{exit_callback}->($pid, -1)
#                            if $self->{children}{$pid}{exit_callback};
#                    }
#                }
#            }
#            elsif (!$killed) {
#                $self->{goners}{$pid} = -1;
#                $self->{children}{$pid}{exit_callback}->($pid, -1)
#                    if $self->{children}{$pid}{exit_callback};
#                $self->debug( "Could not get exit status of $pid")
#                    if $self->{_debug};
#            }
        }
        else {

            $self->debug("Checking if $pid is running") if $self->{_debug};
            if ($child->process and $child->process->Wait(0)) {
                $self->{goners}{$pid} = '';
                my $exit_code;
                $child->process->GetExitCode($exit_code);
                $self->{goners}{$pid} = $exit_code << 8;
                $child->exit_callback->($pid, ($exit_code << 8))
                    if $child->exit_callback;
                $self->debug("$pid exited with status: $self->{goners}{$pid}")
                    if $self->{_debug};
            }
            elsif ($self->{_debug}) {
                $self->debug("$pid is still running");
            }
        }
    }
}

sub oneway {
# ------------------------------------------------------------------------
    my ($self) = @_;
    $self->SUPER::oneway('inet');
}

sub twoway {
# ------------------------------------------------------------------------
    my ($self) = @_;
    $self->SUPER::twoway('inet');
}

sub stop_blocking {
# ------------------------------------------------------------------------
    my ($self, $socket_handle) = @_;
    my $set_it = "1";

    # 126 is FIONBIO (some docs say 0x7F << 16)
    ioctl( $socket_handle,
        0x80000000 | (4 << 16) | (ord('f') << 8) | 126,
        $set_it
    ) or die "ioctl: $^E";
}

sub start_blocking {
# ------------------------------------------------------------------------
    my ($self, $socket_handle) = @_;
    my $unset_it = "0";

    # 126 is FIONBIO (some docs say 0x7F << 16)
    ioctl( $socket_handle,
        0x80000000 | (4 << 16) | (ord('f') << 8) | 126,
        $unset_it
    ) or die "ioctl: $^E";
}

1;

