package staros import ( "bytes" "context" "errors" "io" "os" "os/exec" "sync" "sync/atomic" "time" ) var errNilCommand = errors.New("nil command") var errCommandStdinUnavailable = errors.New("command stdin is not available") var errCommandProcessNotStarted = errors.New("command process is not started") var errCommandAlreadyStarted = errors.New("command already started") var errCommandAlreadyReleased = errors.New("command already released") var errCommandStdinClosed = errors.New("command stdin is closed") var errCommandAlreadyDetached = errors.New("command already detached") var errCommandDetached = errors.New("command already detached") var errCommandRedirectNil = errors.New("command redirect target is nil") const starCmdUnknownExitCode = -999 const starCmdStreamBuffer = 64 type starCmdStream int const ( starCmdStdout starCmdStream = iota starCmdStderr ) // StarCmdOutputStream identifies which process stream produced a chunk. type StarCmdOutputStream int const ( StarCmdOutputStdout StarCmdOutputStream = iota StarCmdOutputStderr ) // StarCmdOutput is a streamed stdout/stderr chunk. type StarCmdOutput struct { Stream StarCmdOutputStream Data []byte } type starCmdWriter struct { cmd *StarCmd stream starCmdStream } func (writer starCmdWriter) Write(data []byte) (int, error) { if writer.cmd == nil { return 0, errNilCommand } writer.cmd.lock.Lock() writer.cmd.ensureBuffers() var redirect io.Writer switch writer.stream { case starCmdStdout: if _, err := writer.cmd.stdoutBuf.Write(data); err != nil { writer.cmd.lock.Unlock() return 0, err } writer.cmd.stdout = append(writer.cmd.stdout, data...) writer.cmd.publishStreamLocked(starCmdStdout, data) redirect = writer.cmd.stdoutRedirect case starCmdStderr: if _, err := writer.cmd.stderrBuf.Write(data); err != nil { writer.cmd.lock.Unlock() return 0, err } writer.cmd.errout = append(writer.cmd.errout, data...) writer.cmd.publishStreamLocked(starCmdStderr, data) redirect = writer.cmd.stderrRedirect default: writer.cmd.lock.Unlock() return 0, errors.New("unknown command stream") } writer.cmd.lock.Unlock() if redirect != nil { writer.cmd.redirectLock.Lock() n, err := redirect.Write(data) writer.cmd.redirectLock.Unlock() if err != nil { return n, err } if n != len(data) { return n, io.ErrShortWrite } } return len(data), nil } //StarCmd Is Here type StarCmd struct { CMD *exec.Cmd infile io.WriteCloser inclosed bool running int32 started int32 released int32 detached int32 //Store AlL of the Standed Outputs stdout []byte //Store All of the Standed Errors errout []byte runerr error exitcode int stdoutBuf *bytes.Buffer stderrBuf *bytes.Buffer lock sync.Mutex prewrite []string prewritetime time.Duration stopctxfunc context.CancelFunc stopctx context.Context doneOnce sync.Once done chan struct{} resultOnce sync.Once resultDone chan struct{} stdoutStream []chan []byte stderrStream []chan []byte outputStream []chan StarCmdOutput streamClosed bool stdoutRedirect io.Writer stderrRedirect io.Writer redirectLock sync.Mutex closeAfter []io.Closer } func (starcli *StarCmd) ensureBuffers() { if starcli.stdoutBuf == nil { starcli.stdoutBuf = bytes.NewBuffer(make([]byte, 0)) } if starcli.stderrBuf == nil { starcli.stderrBuf = bytes.NewBuffer(make([]byte, 0)) } } func (starcli *StarCmd) ensureStopContext() { if starcli.stopctx == nil || starcli.stopctxfunc == nil { starcli.stopctx, starcli.stopctxfunc = context.WithCancel(context.Background()) } if starcli.done == nil { starcli.done = make(chan struct{}) } if starcli.resultDone == nil { starcli.resultDone = make(chan struct{}) } } func (starcli *StarCmd) ensureResultDone() <-chan struct{} { if starcli == nil { closed := make(chan struct{}) close(closed) return closed } starcli.lock.Lock() starcli.ensureStopContext() done := starcli.resultDone starcli.lock.Unlock() return done } func (starcli *StarCmd) signalResultDone() { if starcli == nil { return } starcli.resultOnce.Do(func() { starcli.lock.Lock() starcli.ensureStopContext() done := starcli.resultDone starcli.lock.Unlock() close(done) }) } func (starcli *StarCmd) finish() { if starcli == nil { return } starcli.setRunning(false) starcli.signalResultDone() if starcli.stopctxfunc != nil { starcli.stopctxfunc() } starcli.doneOnce.Do(func() { starcli.lock.Lock() done := starcli.done stdoutStream := starcli.stdoutStream stderrStream := starcli.stderrStream outputStream := starcli.outputStream infile := starcli.infile closeAfter := starcli.closeAfter starcli.stdoutStream = nil starcli.stderrStream = nil starcli.outputStream = nil starcli.closeAfter = nil starcli.streamClosed = true if infile != nil && !starcli.inclosed { starcli.inclosed = true } else { infile = nil } starcli.lock.Unlock() for _, stream := range stdoutStream { close(stream) } for _, stream := range stderrStream { close(stream) } for _, stream := range outputStream { close(stream) } if infile != nil { _ = infile.Close() } for _, closer := range closeAfter { _ = closer.Close() } if done != nil { close(done) } }) } func (starcli *StarCmd) publishStreamLocked(stream starCmdStream, data []byte) { if starcli == nil || starcli.streamClosed { return } switch stream { case starCmdStdout: for _, receiver := range starcli.stdoutStream { select { case receiver <- append([]byte(nil), data...): default: } } for _, receiver := range starcli.outputStream { select { case receiver <- StarCmdOutput{Stream: StarCmdOutputStdout, Data: append([]byte(nil), data...)}: default: } } case starCmdStderr: for _, receiver := range starcli.stderrStream { select { case receiver <- append([]byte(nil), data...): default: } } for _, receiver := range starcli.outputStream { select { case receiver <- StarCmdOutput{Stream: StarCmdOutputStderr, Data: append([]byte(nil), data...)}: default: } } } } func (starcli *StarCmd) registerByteStream(selectStream starCmdStream) <-chan []byte { stream := make(chan []byte, starCmdStreamBuffer) if starcli == nil { close(stream) return stream } starcli.lock.Lock() if starcli.streamClosed { close(stream) } else { switch selectStream { case starCmdStdout: starcli.stdoutStream = append(starcli.stdoutStream, stream) case starCmdStderr: starcli.stderrStream = append(starcli.stderrStream, stream) default: close(stream) } } starcli.lock.Unlock() return stream } func (starcli *StarCmd) ensureConfigurable() error { if starcli == nil || starcli.CMD == nil { return errNilCommand } if atomic.LoadInt32(&starcli.started) != 0 { return errCommandAlreadyStarted } if atomic.LoadInt32(&starcli.detached) != 0 { return errCommandDetached } return nil } func Command(command string, args ...string) (*StarCmd, error) { return newStarCmd(exec.Command(command, args...)) } func CommandContext(ctx context.Context, command string, args ...string) (*StarCmd, error) { return newStarCmd(exec.CommandContext(ctx, command, args...)) } func newStarCmd(cmd *exec.Cmd) (*StarCmd, error) { var err error shell := &StarCmd{ CMD: cmd, prewritetime: time.Millisecond * 200, stdoutBuf: bytes.NewBuffer(make([]byte, 0)), stderrBuf: bytes.NewBuffer(make([]byte, 0)), done: make(chan struct{}), resultDone: make(chan struct{}), exitcode: starCmdUnknownExitCode, } shell.stopctx, shell.stopctxfunc = context.WithCancel(context.Background()) shell.infile, err = shell.CMD.StdinPipe() if err != nil { return shell, err } shell.CMD.Stdout = starCmdWriter{cmd: shell, stream: starCmdStdout} shell.CMD.Stderr = starCmdWriter{cmd: shell, stream: starCmdStderr} return shell, nil } func (starcli *StarCmd) NowLineOutput() (string, error) { if starcli == nil { return "", errNilCommand } starcli.lock.Lock() defer starcli.lock.Unlock() starcli.ensureBuffers() buf, _ := starcli.stdoutBuf.ReadBytes('\n') buferr, _ := starcli.stderrBuf.ReadBytes(byte('\n')) if len(buferr) != 0 { return string(buf), errors.New(string(buferr)) } return string(buf), nil } func (starcli *StarCmd) NowLineStdOut() string { if starcli == nil { return "" } starcli.lock.Lock() defer starcli.lock.Unlock() starcli.ensureBuffers() buf, _ := starcli.stdoutBuf.ReadBytes('\n') return string(buf) } func (starcli *StarCmd) NowLineStdErr() error { if starcli == nil { return errNilCommand } starcli.lock.Lock() defer starcli.lock.Unlock() starcli.ensureBuffers() buferr, _ := starcli.stderrBuf.ReadBytes(byte('\n')) if len(buferr) != 0 { return errors.New(string(buferr)) } return nil } func (starcli *StarCmd) NowAllOutput() (string, error) { if starcli == nil { return "", errNilCommand } var outstr string starcli.lock.Lock() defer starcli.lock.Unlock() starcli.ensureBuffers() buf := make([]byte, starcli.stdoutBuf.Len()) n, _ := starcli.stdoutBuf.Read(buf) runerr := starcli.runerr if n != 0 { outstr = string(buf[:n]) } if runerr != nil { return outstr, runerr } buf = make([]byte, starcli.stderrBuf.Len()) n, _ = starcli.stderrBuf.Read(buf) if n != 0 { return outstr, errors.New(string(buf[:n])) } return outstr, nil } func (starcli *StarCmd) NowStdOut() string { if starcli == nil { return "" } var outstr string starcli.lock.Lock() defer starcli.lock.Unlock() starcli.ensureBuffers() buf := make([]byte, starcli.stdoutBuf.Len()) n, _ := starcli.stdoutBuf.Read(buf) if n != 0 { outstr = string(buf[:n]) } return outstr } func (starcli *StarCmd) NowStdErr() error { if starcli == nil { return errNilCommand } starcli.lock.Lock() defer starcli.lock.Unlock() starcli.ensureBuffers() buf := make([]byte, starcli.stderrBuf.Len()) n, _ := starcli.stderrBuf.Read(buf) if n != 0 { return errors.New(string(buf[:n])) } return nil } func (starcli *StarCmd) AllOutPut() (string, error) { if starcli == nil { return "", errNilCommand } starcli.lock.Lock() defer starcli.lock.Unlock() err := starcli.runerr if err == nil && len(starcli.errout) != 0 { err = errors.New(string(starcli.errout)) } return string(starcli.stdout), err } func (starcli *StarCmd) AllStdOut() string { if starcli == nil { return "" } starcli.lock.Lock() defer starcli.lock.Unlock() return string(starcli.stdout) } func (starcli *StarCmd) AllStdErr() error { if starcli == nil { return errNilCommand } starcli.lock.Lock() defer starcli.lock.Unlock() err := starcli.runerr if err == nil && len(starcli.errout) != 0 { err = errors.New(string(starcli.errout)) } return err } func (starcli *StarCmd) setRunning(alive bool) { if starcli == nil { return } if alive { atomic.StoreInt32(&starcli.running, 1) return } atomic.StoreInt32(&starcli.running, 0) } func (starcli *StarCmd) Start() error { if starcli == nil || starcli.CMD == nil { return errNilCommand } if atomic.LoadInt32(&starcli.detached) != 0 { return errCommandDetached } starcli.lock.Lock() starcli.ensureBuffers() starcli.ensureStopContext() starcli.lock.Unlock() if !atomic.CompareAndSwapInt32(&starcli.started, 0, 1) { return errCommandAlreadyStarted } if err := starcli.CMD.Start(); err != nil { starcli.lock.Lock() starcli.runerr = err starcli.exitcode = -1 starcli.lock.Unlock() starcli.signalResultDone() starcli.finish() return err } starcli.setRunning(true) go func() { err := starcli.CMD.Wait() if err != nil { starcli.lock.Lock() starcli.runerr = err starcli.lock.Unlock() } if starcli.CMD.ProcessState != nil { starcli.lock.Lock() starcli.exitcode = starcli.CMD.ProcessState.ExitCode() starcli.lock.Unlock() } starcli.signalResultDone() starcli.finish() }() go func(ctx context.Context) { starcli.lock.Lock() prewrite := append([]string(nil), starcli.prewrite...) prewritetime := starcli.prewritetime starcli.lock.Unlock() for _, v := range prewrite { select { case <-ctx.Done(): return default: } _ = starcli.WriteCmdE(v) time.Sleep(prewritetime) } }(starcli.stopctx) return nil } func (starcli *StarCmd) IsRunning() bool { if starcli == nil { return false } return 0 != atomic.LoadInt32(&starcli.running) } func (starcli *StarCmd) runError() error { if starcli == nil { return errNilCommand } starcli.lock.Lock() defer starcli.lock.Unlock() return starcli.runerr } func (starcli *StarCmd) ensureWaitable() error { if starcli == nil || starcli.CMD == nil { return errNilCommand } if atomic.LoadInt32(&starcli.started) == 0 { return errCommandProcessNotStarted } return nil } // Stopped returns a channel that is closed after the command reaches its final state. func (starcli *StarCmd) Stopped() <-chan struct{} { if starcli == nil { closed := make(chan struct{}) close(closed) return closed } starcli.lock.Lock() starcli.ensureStopContext() done := starcli.done starcli.lock.Unlock() return done } // Stoped returns a channel that is closed after the command reaches its final state. // // Deprecated: use Stopped. func (starcli *StarCmd) Stoped() <-chan struct{} { return starcli.Stopped() } // Wait blocks until the command reaches its final state and returns the process wait error. func (starcli *StarCmd) Wait() error { if err := starcli.ensureWaitable(); err != nil { return err } <-starcli.ensureResultDone() return starcli.runError() } // WaitContext blocks until the command reaches its final state or ctx is done. func (starcli *StarCmd) WaitContext(ctx context.Context) error { if err := starcli.ensureWaitable(); err != nil { return err } if ctx == nil { return starcli.Wait() } resultDone := starcli.ensureResultDone() select { case <-resultDone: return starcli.runError() default: } select { case <-resultDone: return starcli.runError() case <-ctx.Done(): select { case <-resultDone: return starcli.runError() default: } return ctx.Err() } } // WaitTimeout blocks until the command reaches its final state or tm elapses. func (starcli *StarCmd) WaitTimeout(tm time.Duration) error { if err := starcli.ensureWaitable(); err != nil { return err } if tm <= 0 { select { case <-starcli.ensureResultDone(): return starcli.runError() default: return ERR_TIMEOUT } } timer := time.NewTimer(tm) defer timer.Stop() resultDone := starcli.ensureResultDone() select { case <-resultDone: return starcli.runError() case <-timer.C: select { case <-resultDone: return starcli.runError() default: return ERR_TIMEOUT } } } // StdoutChan returns a channel that receives future stdout chunks until Stopped closes. func (starcli *StarCmd) StdoutChan() <-chan []byte { return starcli.registerByteStream(starCmdStdout) } // StderrChan returns a channel that receives future stderr chunks until Stopped closes. func (starcli *StarCmd) StderrChan() <-chan []byte { return starcli.registerByteStream(starCmdStderr) } // OutputChan returns a channel that receives future stdout and stderr chunks until Stopped closes. func (starcli *StarCmd) OutputChan() <-chan StarCmdOutput { stream := make(chan StarCmdOutput, starCmdStreamBuffer) if starcli == nil { close(stream) return stream } starcli.lock.Lock() if starcli.streamClosed { close(stream) } else { starcli.outputStream = append(starcli.outputStream, stream) } starcli.lock.Unlock() return stream } // RedirectStdout mirrors stdout into writer while keeping StarCmd output capture enabled. func (starcli *StarCmd) RedirectStdout(writer io.Writer) error { if writer == nil { return errCommandRedirectNil } if err := starcli.ensureConfigurable(); err != nil { return err } starcli.lock.Lock() starcli.stdoutRedirect = writer starcli.lock.Unlock() return nil } // RedirectStderr mirrors stderr into writer while keeping StarCmd error capture enabled. func (starcli *StarCmd) RedirectStderr(writer io.Writer) error { if writer == nil { return errCommandRedirectNil } if err := starcli.ensureConfigurable(); err != nil { return err } starcli.lock.Lock() starcli.stderrRedirect = writer starcli.lock.Unlock() return nil } // RedirectOutput mirrors stdout and stderr into writer while keeping StarCmd capture enabled. func (starcli *StarCmd) RedirectOutput(writer io.Writer) error { if writer == nil { return errCommandRedirectNil } if err := starcli.ensureConfigurable(); err != nil { return err } starcli.lock.Lock() starcli.stdoutRedirect = writer starcli.stderrRedirect = writer starcli.lock.Unlock() return nil } // RedirectStdin replaces the managed stdin pipe with reader. func (starcli *StarCmd) RedirectStdin(reader io.Reader) error { if reader == nil { return errCommandRedirectNil } if err := starcli.ensureConfigurable(); err != nil { return err } starcli.lock.Lock() if starcli.infile != nil && !starcli.inclosed { if err := starcli.infile.Close(); err != nil { starcli.lock.Unlock() return err } } starcli.CMD.Stdin = reader starcli.infile = nil starcli.inclosed = true starcli.lock.Unlock() return nil } func (starcli *StarCmd) addCloseAfter(closer io.Closer) { if starcli == nil || closer == nil { return } starcli.lock.Lock() starcli.closeAfter = append(starcli.closeAfter, closer) starcli.lock.Unlock() } // RedirectStdoutFile mirrors stdout into path while keeping StarCmd output capture enabled. func (starcli *StarCmd) RedirectStdoutFile(path string) error { if err := starcli.ensureConfigurable(); err != nil { return err } file, err := os.OpenFile(path, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0644) if err != nil { return err } if err := starcli.RedirectStdout(file); err != nil { _ = file.Close() return err } starcli.addCloseAfter(file) return nil } // RedirectStderrFile mirrors stderr into path while keeping StarCmd error capture enabled. func (starcli *StarCmd) RedirectStderrFile(path string) error { if err := starcli.ensureConfigurable(); err != nil { return err } file, err := os.OpenFile(path, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0644) if err != nil { return err } if err := starcli.RedirectStderr(file); err != nil { _ = file.Close() return err } starcli.addCloseAfter(file) return nil } // RedirectOutputFile mirrors stdout and stderr into path while keeping StarCmd capture enabled. func (starcli *StarCmd) RedirectOutputFile(path string) error { if err := starcli.ensureConfigurable(); err != nil { return err } file, err := os.OpenFile(path, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0644) if err != nil { return err } if err := starcli.RedirectOutput(file); err != nil { _ = file.Close() return err } starcli.addCloseAfter(file) return nil } // RedirectStdinFile replaces the managed stdin pipe with path opened for reading. func (starcli *StarCmd) RedirectStdinFile(path string) error { if err := starcli.ensureConfigurable(); err != nil { return err } file, err := os.Open(path) if err != nil { return err } if err := starcli.RedirectStdin(file); err != nil { _ = file.Close() return err } starcli.addCloseAfter(file) return nil } func (starcli *StarCmd) Exec(cmd string, wait int) (string, error) { if err := starcli.WriteCmdE(cmd); err != nil { return "", err } time.Sleep(time.Millisecond * time.Duration(wait)) return starcli.NowAllOutput() } func (starcli *StarCmd) WriteCmd(cmdstr string) { _ = starcli.WriteCmdE(cmdstr) } // WriteStdinE writes raw bytes to stdin without appending a newline. func (starcli *StarCmd) WriteStdinE(data []byte) error { if starcli == nil { return errNilCommand } starcli.lock.Lock() infile := starcli.infile inclosed := starcli.inclosed starcli.lock.Unlock() if infile == nil { return errCommandStdinUnavailable } if inclosed { return errCommandStdinClosed } _, err := infile.Write(data) return err } // WriteStdinStringE writes raw text to stdin without appending a newline. func (starcli *StarCmd) WriteStdinStringE(data string) error { return starcli.WriteStdinE([]byte(data)) } // WriteStdinLineE writes text to stdin and appends one newline. func (starcli *StarCmd) WriteStdinLineE(data string) error { return starcli.WriteStdinStringE(data + "\n") } func (starcli *StarCmd) WriteCmdE(cmdstr string) error { return starcli.WriteStdinLineE(cmdstr) } func (starcli *StarCmd) CloseStdin() { _ = starcli.CloseStdinE() } func (starcli *StarCmd) CloseStdinE() error { if starcli == nil { return errNilCommand } starcli.lock.Lock() infile := starcli.infile if infile == nil { starcli.lock.Unlock() return errCommandStdinUnavailable } if starcli.inclosed { starcli.lock.Unlock() return errCommandStdinClosed } starcli.inclosed = true starcli.lock.Unlock() return infile.Close() } func (starcli *StarCmd) PreWrite(cmd ...string) { if starcli == nil { return } starcli.lock.Lock() defer starcli.lock.Unlock() for _, v := range cmd { starcli.prewrite = append(starcli.prewrite, v) } } func (starcli *StarCmd) PreWriteInterval(dt time.Duration) { if starcli == nil { return } starcli.lock.Lock() defer starcli.lock.Unlock() starcli.prewritetime = dt } func (starcli *StarCmd) ExitCode() int { if starcli == nil { return starCmdUnknownExitCode } starcli.lock.Lock() defer starcli.lock.Unlock() return starcli.exitcode } func (starcli *StarCmd) Kill() error { if starcli == nil || starcli.CMD == nil || starcli.CMD.Process == nil { return errCommandProcessNotStarted } err := starcli.CMD.Process.Kill() if err != nil { return err } return nil } func (starcli *StarCmd) GetPid() int { if starcli == nil || starcli.CMD == nil || starcli.CMD.Process == nil { return -1 } return starcli.CMD.Process.Pid } func (starcli *StarCmd) Signal(sig os.Signal) error { if starcli == nil || starcli.CMD == nil || starcli.CMD.Process == nil { return errCommandProcessNotStarted } return starcli.CMD.Process.Signal(sig) }