#!/usr/bin/env python3 """ Genpix SkyWalker-1 DiSEqC 1.2 motor control tool. Subcommands: halt - Stop motor movement east/west - Drive motor (continuous or stepped) goto - Go to stored position slot store - Store current position to slot gotox - USALS GotoX (automatic orbital positioning) limit - Set software travel limit nolimits - Disable software limits raw - Send raw DiSEqC bytes interactive - Keyboard-driven jog controller with live signal """ import sys import os import argparse import time import atexit import select import termios import tty # Add tools directory to path for library import sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) from skywalker_lib import SkyWalker1, signal_bar # -- Safety timeout for continuous drive -- CONTINUOUS_DRIVE_TIMEOUT = 30.0 # seconds # -- Subcommand handlers -- def cmd_halt(sw: SkyWalker1, args: argparse.Namespace) -> None: """Stop motor movement.""" sw.motor_halt() print("Motor halted") def cmd_east(sw: SkyWalker1, args: argparse.Namespace) -> None: """Drive motor east.""" steps = args.steps if steps: sw.motor_drive_east(steps) print(f"Driving east {steps} step(s)") else: sw.motor_drive_east(0) print(f"Driving east (continuous) -- will auto-halt after {CONTINUOUS_DRIVE_TIMEOUT:.0f}s") print("Press Ctrl-C to stop") _wait_with_halt(sw, CONTINUOUS_DRIVE_TIMEOUT) def cmd_west(sw: SkyWalker1, args: argparse.Namespace) -> None: """Drive motor west.""" steps = args.steps if steps: sw.motor_drive_west(steps) print(f"Driving west {steps} step(s)") else: sw.motor_drive_west(0) print(f"Driving west (continuous) -- will auto-halt after {CONTINUOUS_DRIVE_TIMEOUT:.0f}s") print("Press Ctrl-C to stop") _wait_with_halt(sw, CONTINUOUS_DRIVE_TIMEOUT) def _wait_with_halt(sw: SkyWalker1, timeout: float) -> None: """Wait for timeout or Ctrl-C, then halt the motor.""" try: time.sleep(timeout) except KeyboardInterrupt: pass finally: sw.motor_halt() print("\nMotor halted") def cmd_goto(sw: SkyWalker1, args: argparse.Namespace) -> None: """Go to a stored position slot.""" slot = args.slot if slot == 0: print("Going to reference position (slot 0)") else: print(f"Going to stored position {slot}") sw.motor_goto_position(slot) def cmd_store(sw: SkyWalker1, args: argparse.Namespace) -> None: """Store the current dish position into a slot.""" slot = args.slot sw.motor_store_position(slot) print(f"Current position stored in slot {slot}") def cmd_gotox(sw: SkyWalker1, args: argparse.Namespace) -> None: """USALS GotoX: calculate and drive to satellite longitude.""" sat_lon = args.sat obs_lon = args.lon obs_lat = args.lat # Import usals_angle for display from skywalker_lib import usals_angle, usals_encode_angle angle = usals_angle(obs_lon, sat_lon, obs_lat) hh, ll = usals_encode_angle(angle) direction = "west" if angle < 0 else "east" print(f"USALS GotoX") print(f" Observer: {obs_lon:.2f} lon, {obs_lat:.2f} lat") print(f" Satellite: {sat_lon:.2f} lon") print(f" Motor angle: {abs(angle):.2f} deg {direction}") print(f" DiSEqC: E0 31 6E {hh:02X} {ll:02X}") sw.motor_goto_x(obs_lon, sat_lon) print(" Command sent") def cmd_limit(sw: SkyWalker1, args: argparse.Namespace) -> None: """Set a software travel limit at the current position.""" direction = args.direction sw.motor_set_limit(direction) print(f"Software {direction} limit set at current position") def cmd_nolimits(sw: SkyWalker1, args: argparse.Namespace) -> None: """Disable software travel limits.""" sw.motor_disable_limits() print("Software limits disabled") def cmd_raw(sw: SkyWalker1, args: argparse.Namespace) -> None: """Send a raw DiSEqC command.""" raw_bytes = bytes(int(b, 16) for b in args.bytes) if len(raw_bytes) < 3 or len(raw_bytes) > 6: print("DiSEqC message must be 3-6 bytes") sys.exit(1) print(f"Sending DiSEqC: {raw_bytes.hex(' ')}") sw.send_diseqc_message(raw_bytes) print(" OK") # -- Interactive jog controller -- def cmd_interactive(sw: SkyWalker1, args: argparse.Namespace) -> None: """Keyboard-driven jog controller with live signal monitoring.""" # Register atexit halt -- fires on unexpected exit, Ctrl-C leak, etc. def emergency_halt(): try: sw.motor_halt() except Exception: pass atexit.register(emergency_halt) # Save terminal state and switch to raw mode fd = sys.stdin.fileno() old_attrs = termios.tcgetattr(fd) def restore_terminal(): termios.tcsetattr(fd, termios.TCSAFLUSH, old_attrs) # Show cursor, clear line sys.stdout.write("\033[?25h\n") sys.stdout.flush() atexit.register(restore_terminal) tty.setraw(fd) # Hide cursor for cleaner display sys.stdout.write("\033[?25l") sys.stdout.flush() _interactive_loop(sw, fd, args.verbose) # Cleanup (atexit handles restore, but be explicit for normal exit) restore_terminal() emergency_halt() atexit.unregister(restore_terminal) atexit.unregister(emergency_halt) def _interactive_loop(sw: SkyWalker1, fd: int, verbose: bool) -> None: """Main loop for interactive jog mode.""" state = { "driving": None, # None, 'east', or 'west' "drive_start": 0.0, # time.time() when drive began "store_mode": False, # True = next digit stores position "running": True, } POLL_INTERVAL = 0.5 # seconds between signal refreshes (~2 Hz) last_refresh = 0.0 _draw_header() while state["running"]: now = time.time() # Auto-halt safety: stop after CONTINUOUS_DRIVE_TIMEOUT if state["driving"] and (now - state["drive_start"]) >= CONTINUOUS_DRIVE_TIMEOUT: sw.motor_halt() _status_line(f"AUTO-HALT: {CONTINUOUS_DRIVE_TIMEOUT:.0f}s safety limit reached") state["driving"] = None # Refresh signal display at ~2 Hz if now - last_refresh >= POLL_INTERVAL: try: sig = sw.signal_monitor() _draw_signal(sig, state) except Exception: _draw_signal(None, state) last_refresh = now # Non-blocking key read if select.select([fd], [], [], 0.05)[0]: ch = os.read(fd, 8) _handle_key(sw, ch, state) def _handle_key(sw: SkyWalker1, ch: bytes, state: dict) -> None: """Process a keypress in interactive mode.""" # Escape sequences for arrow keys if ch == b'\x1b[D' or ch == b'\x1b[C': direction = 'west' if ch == b'\x1b[D' else 'east' if state["driving"] != direction: if direction == 'east': sw.motor_drive_east(0) else: sw.motor_drive_west(0) state["driving"] = direction state["drive_start"] = time.time() _status_line(f"Driving {direction}...") return # Space = halt if ch == b' ': sw.motor_halt() state["driving"] = None _status_line("Halted") return # q = quit if ch in (b'q', b'Q', b'\x03'): # q, Q, or Ctrl-C sw.motor_halt() state["driving"] = None state["running"] = False _status_line("Quitting...") return # s = enter store mode (next digit saves position) if ch == b's' or ch == b'S': state["store_mode"] = True _status_line("Store mode: press 1-9 to save position") return # g = prompt for USALS GotoX if ch == b'g' or ch == b'G': _gotox_prompt(sw, state) return # Digits 1-9: goto or store if len(ch) == 1 and ord(ch) in range(ord('1'), ord('9') + 1): slot = ord(ch) - ord('0') if state["store_mode"]: sw.motor_store_position(slot) _status_line(f"Position stored in slot {slot}") state["store_mode"] = False else: sw.motor_goto_position(slot) state["driving"] = None _status_line(f"Going to position {slot}") return # 0 = goto reference if ch == b'0': if state["store_mode"]: _status_line("Slot 0 is reference -- not storable") state["store_mode"] = False else: sw.motor_goto_position(0) state["driving"] = None _status_line("Going to reference (slot 0)") return # Unknown key -- clear store mode if state["store_mode"]: state["store_mode"] = False _status_line("Store cancelled") def _gotox_prompt(sw: SkyWalker1, state: dict) -> None: """ Prompt for USALS GotoX parameters in raw terminal mode. Reads satellite longitude and observer longitude character-by-character since we're in raw mode and can't use input(). """ _status_line("GotoX: enter satellite longitude (e.g. -97.5): ") sat_str = _raw_readline() if sat_str is None: _status_line("GotoX cancelled") return _status_line(f"Sat {sat_str} -- enter observer longitude: ") obs_str = _raw_readline() if obs_str is None: _status_line("GotoX cancelled") return try: sat_lon = float(sat_str) obs_lon = float(obs_str) except ValueError: _status_line("Invalid coordinates") return from skywalker_lib import usals_angle angle = usals_angle(obs_lon, sat_lon) direction = "W" if angle < 0 else "E" sw.motor_goto_x(obs_lon, sat_lon) state["driving"] = None _status_line(f"GotoX: sat {sat_lon} obs {obs_lon} -> {abs(angle):.1f} deg {direction}") def _raw_readline() -> str | None: """Read a line of text in raw terminal mode, echoing characters.""" fd = sys.stdin.fileno() buf = [] while True: if select.select([fd], [], [], 30.0)[0]: ch = os.read(fd, 1) if ch == b'\r' or ch == b'\n': sys.stdout.write("\r\n") sys.stdout.flush() return ''.join(buf) if ch == b'\x03' or ch == b'\x1b': # Ctrl-C or Escape return None if ch == b'\x7f' or ch == b'\x08': # Backspace if buf: buf.pop() sys.stdout.write("\b \b") sys.stdout.flush() continue # Accept digits, minus, period c = ch.decode('ascii', errors='ignore') if c in '0123456789.-': buf.append(c) sys.stdout.write(c) sys.stdout.flush() else: # Timeout waiting for input return None # -- Display helpers -- def _draw_header() -> None: """Print the interactive mode header (once at startup).""" sys.stdout.write("\r\n") sys.stdout.write(" SkyWalker-1 Motor Control\r\n") sys.stdout.write(" ========================\r\n") sys.stdout.write(" Left/Right : jog west/east (continuous)\r\n") sys.stdout.write(" Space : halt\r\n") sys.stdout.write(" 1-9 : goto stored position\r\n") sys.stdout.write(" s + 1-9 : store to position slot\r\n") sys.stdout.write(" g : USALS GotoX prompt\r\n") sys.stdout.write(" q : quit\r\n") sys.stdout.write("\r\n") sys.stdout.flush() def _draw_signal(sig: dict | None, state: dict) -> None: """Update the signal monitor line at the bottom of the display.""" # Save cursor, move to signal display area sys.stdout.write("\033[s") # save cursor if sig is None: sys.stdout.write("\r\n Signal: -- no data --\033[K") else: snr_db = sig["snr_db"] agc1 = sig["agc1"] locked = sig["locked"] power_db = sig["power_db"] pct = sig["snr_pct"] lock_str = "LOCK" if locked else "----" bar = signal_bar(pct, width=25) drive_str = "" if state["driving"]: elapsed = time.time() - state["drive_start"] remaining = CONTINUOUS_DRIVE_TIMEOUT - elapsed drive_str = f" [{state['driving'].upper()} {remaining:.0f}s]" line = (f"\r [{lock_str}] SNR {snr_db:5.1f} dB " f"AGC {agc1:5d} " f"Pwr {power_db:5.1f} dB " f"{bar}{drive_str}") sys.stdout.write(f"\n{line}\033[K") sys.stdout.write("\033[u") # restore cursor sys.stdout.flush() def _status_line(msg: str) -> None: """Write a status message on a dedicated line.""" sys.stdout.write(f"\r > {msg}\033[K\r\n") sys.stdout.flush() # -- CLI -- def build_parser() -> argparse.ArgumentParser: parser = argparse.ArgumentParser( prog="motor.py", description="Genpix SkyWalker-1 DiSEqC 1.2 motor control", formatter_class=argparse.RawDescriptionHelpFormatter, epilog="""\ examples: %(prog)s halt %(prog)s east --steps 10 %(prog)s west %(prog)s goto 3 %(prog)s store 3 %(prog)s gotox --sat -97.5 --lon -96.8 %(prog)s gotox --sat -97.5 --lon -96.8 --lat 32.7 %(prog)s limit east %(prog)s nolimits %(prog)s raw E0 31 6B 01 %(prog)s interactive interactive mode controls: Arrow left/right jog west/east (continuous drive) Space halt motor 1-9 go to stored position s + 1-9 store position to slot g USALS GotoX prompt q quit safety: Continuous drive auto-halts after 30 seconds. Motor is halted on exit (including unexpected termination). """) parser.add_argument('-v', '--verbose', action='store_true', help="Show raw USB traffic") sub = parser.add_subparsers(dest='command') # halt sub.add_parser('halt', help="Stop motor movement") # east p_east = sub.add_parser('east', help="Drive motor east") p_east.add_argument('--steps', type=int, default=0, help="Number of steps (0=continuous, 1-127)") # west p_west = sub.add_parser('west', help="Drive motor west") p_west.add_argument('--steps', type=int, default=0, help="Number of steps (0=continuous, 1-127)") # goto p_goto = sub.add_parser('goto', help="Go to stored position") p_goto.add_argument('slot', type=int, help="Position slot (0=reference, 1-255)") # store p_store = sub.add_parser('store', help="Store current position") p_store.add_argument('slot', type=int, help="Position slot to store to (1-255)") # gotox p_gotox = sub.add_parser('gotox', help="USALS GotoX (automatic positioning)") p_gotox.add_argument('--sat', type=float, required=True, help="Satellite longitude (negative=west, e.g. -97.5)") p_gotox.add_argument('--lon', type=float, required=True, help="Observer longitude (negative=west)") p_gotox.add_argument('--lat', type=float, default=0.0, help="Observer latitude (default: 0.0)") # limit p_limit = sub.add_parser('limit', help="Set software travel limit") p_limit.add_argument('direction', choices=['east', 'west'], help="Limit direction") # nolimits sub.add_parser('nolimits', help="Disable software travel limits") # raw p_raw = sub.add_parser('raw', help="Send raw DiSEqC bytes") p_raw.add_argument('bytes', nargs='+', metavar='HH', help="Hex bytes (e.g. E0 31 6B 01)") # interactive sub.add_parser('interactive', help="Keyboard-driven jog controller") return parser def main(): parser = build_parser() args = parser.parse_args() if not args.command: parser.print_help() sys.exit(0) dispatch = { 'halt': cmd_halt, 'east': cmd_east, 'west': cmd_west, 'goto': cmd_goto, 'store': cmd_store, 'gotox': cmd_gotox, 'limit': cmd_limit, 'nolimits': cmd_nolimits, 'raw': cmd_raw, 'interactive': cmd_interactive, } handler = dispatch.get(args.command) if handler is None: parser.print_help() sys.exit(1) with SkyWalker1(verbose=args.verbose) as sw: # Boot demodulator and enable LNB power for DiSEqC sw.ensure_booted() sw.start_intersil(on=True) handler(sw, args) if __name__ == '__main__': main()