package starssh import ( "crypto/rand" "encoding/hex" "fmt" "regexp" "strconv" "strings" "time" ) var ( ansiCSIRegexp = regexp.MustCompile(`\x1b\[[0-9;?]*[ -/]*[@-~]`) ansiOSCRegexp = regexp.MustCompile(`\x1b\][^\x07]*(\x07|\x1b\\)`) leadingIntRegexp = regexp.MustCompile(`^[+-]?\d+`) ) func SedColor(str string) string { return stripControlSequences(str) } func normalizeShellOutput(raw string) string { return strings.TrimSpace(strings.ReplaceAll(raw, "\r\n", "\n")) } func stripControlSequences(raw string) string { cleaned := ansiOSCRegexp.ReplaceAllString(raw, "") cleaned = ansiCSIRegexp.ReplaceAllString(cleaned, "") cleaned = strings.ReplaceAll(cleaned, "\r", "") cleaned = strings.Map(func(r rune) rune { if r == '\n' || r == '\t' { return r } if r < 0x20 || r == 0x7f { return -1 } return r }, cleaned) return cleaned } func stripLeadingEcho(output string, command string, markerCommand string) string { result := output if command == "" { return result } withMarker := command if markerCommand != "" { withMarker += "\n" + markerCommand } if strings.HasPrefix(result, withMarker) { return strings.TrimSpace(strings.TrimPrefix(result, withMarker)) } if strings.HasPrefix(result, command) { return strings.TrimSpace(strings.TrimPrefix(result, command)) } return result } func collectLinesWithoutTokens(output string, tokens ...string) string { lines := strings.Split(output, "\n") filtered := make([]string, 0, len(lines)) for _, line := range lines { skip := false for _, token := range tokens { if token != "" && strings.Contains(line, token) { skip = true break } } if !skip { filtered = append(filtered, line) } } return strings.TrimSpace(strings.Join(filtered, "\n")) } func collectLinesForCommandOutput(output string, promptToken string, tokens ...string) string { lines := strings.Split(output, "\n") filtered := make([]string, 0, len(lines)) for _, line := range lines { skip := false for _, token := range tokens { if token != "" && strings.Contains(line, token) { skip = true break } } if !skip && promptToken != "" && strings.Contains(line, promptToken) { skip = true } if !skip { filtered = append(filtered, line) } } return strings.TrimSpace(strings.Join(filtered, "\n")) } func shellSingleQuote(s string) string { return "'" + strings.ReplaceAll(s, "'", "'\"'\"'") + "'" } func normalizeBufferSize(bufcap int) int { if bufcap <= 0 { return defaultTransferBufferSize } return bufcap } func newCommandTokens() (beginToken string, endToken string) { nonce := newNonce(8) return "__STARSSH_BEGIN_" + nonce + "__", "__STARSSH_END_" + nonce + "__" } func newNonce(size int) string { if size <= 0 { size = 8 } buf := make([]byte, size) if _, err := rand.Read(buf); err != nil { return fmt.Sprintf("%d", time.Now().UnixNano()) } return strings.ToUpper(hex.EncodeToString(buf)) } func splitByEndToken(output string, endToken string) (before string, exitCode int, found bool, parseErr error) { prefix := endToken + ":" lines := strings.Split(output, "\n") beforeLines := make([]string, 0, len(lines)) for _, line := range lines { trimmedLine := strings.TrimSpace(line) if !strings.HasPrefix(trimmedLine, prefix) { beforeLines = append(beforeLines, line) continue } codeText := strings.TrimSpace(trimmedLine[len(prefix):]) match := leadingIntRegexp.FindString(codeText) if match == "" || match != codeText { beforeLines = append(beforeLines, line) continue } code, err := strconv.Atoi(match) if err != nil { beforeLines = append(beforeLines, line) continue } return strings.Join(beforeLines, "\n"), code, true, nil } return strings.Join(beforeLines, "\n"), 0, false, nil }