source code
#!/usr/bin/python3
# coding: utf-8
# version: 1.0.0
# mac: first line should be #!/usr/local/bin/python3
# for mac users, download python3.6+ from https://www.python.org/downloads/ . Don't use brew.
# pip3 install tornado
# https://www.tornadoweb.org/en/stable/
# sample usage
# ./eggshell -ro . -rw . -c 'bash'
import sys
_WINDOWS = sys.platform.startswith('win')
if _WINDOWS:
raise AssertionError('eggshell is not compatible with Windows')
from tornado import gen
from tornado.ioloop import IOLoop, PeriodicCallback
from tornado.websocket import websocket_connect
from tornado.process import Subprocess, CalledProcessError
from tornado.iostream import StreamClosedError
import json
import uuid
import urllib
import urllib.parse
import urllib.request
import termios
import struct
import fcntl
import subprocess
import argparse
import textwrap
import re
URL = 'wss://eggshell.pjy.us/ws/publish?readonly=%s&readwrite=%s&overwrite=%d'
LISTEN = 'https://eggshell.pjy.us/v/%s'
COMMAND = 'bash'
class Client(object):
pty = None
rc = -1
def __init__(self, url, args):
self.url = url
self.args = args
#self.timeout = args.timeout
self.ioloop = IOLoop.instance()
self.ws = None
self.connect()
PeriodicCallback(self.keep_alive, 20000).start()
if sys.platform == 'darwin':
param = ['env', 'TERM=xterm-color', 'script', '-q', '/dev/null', args.command ]
else:
param = ['env', 'TERM=xterm-color', 'script', '-e', '-q', '-c', args.command, '/dev/null']
self.app = Subprocess(param,
stdout=Subprocess.STREAM,
stderr=Subprocess.STREAM,
stdin=Subprocess.STREAM)
# not for Windows
self.app.set_exit_callback(self.exit_callback)
self.ioloop.start()
def exit_callback(self, ret: int) -> None:
self.rc = ret
@gen.coroutine
def connect(self):
try:
self.ws = yield websocket_connect(self.url, on_message_callback = self.incoming_message)
except Exception as e:
print("connection error:", e)
else:
if self.args.readonly != '':
url = LISTEN % (self.args.readonly,)
print("Read-only view: \x1B[1;32m%s\x1B[0m" % (url,))
if self.args.readwrite != '':
url = LISTEN % (self.args.readwrite,)
print("Interactive view: \x1B[1;32m%s\x1B[0m" % (url,))
self.run()
def incoming_message(self, msg):
# incoming websocket message - send to process, but only if read/write
payload = json.loads(msg)
if 'command' in payload:
if payload['command'] == 'input':
self.app.stdin.write(payload['data'].encode('iso-8859-1'))
elif payload['command'] == 'geometry':
self.setGeometry(payload['data'][1], payload['data'][0])
else:
print('unknown command')
print(msg)
else:
print('missing command')
print(msg)
def getTTY(self, pid):
# find the parent of the specified pid
if sys.platform == 'darwin':
pid = str(pid)
p = subprocess.Popen(['ps', '-Ao', 'pid,ppid,tty'], stdout=subprocess.PIPE, shell=False)
(output, err) = p.communicate()
p.wait()
lines = output.split(b"\n")
for line in lines:
line = line.decode('iso-8859-1');
result = re.match(r'\s*(\d+)\s+(\d+)\s+(.*)$', line)
if not result is None:
if result.group(2) == pid:
return '/dev/' + result.group(3).strip()
else:
p = subprocess.Popen(['ps', '--ppid', str(self.app.pid), '-o', 'tty'], stdout=subprocess.PIPE, shell=False)
(output, err) = p.communicate()
p.wait()
output = output.decode('iso-8859-1')
lines = output.split("\n")
return '/dev/' + lines[1]
def setGeometry(self, rows, cols):
# determine name of /dev/pts/x
if self.pty is None:
pid = self.app.pid
self.pty = self.getTTY(pid)
# run command: stty -F /dev/pts/xyz rows 24 cols 80
params = ['stty', '-F', self.pty, 'rows', str(rows), 'cols', str(cols)]
if sys.platform == 'darwin':
params[1] = '-f'
p = subprocess.Popen(params, stdout=subprocess.PIPE, shell=False)
(output, err) = p.communicate()
p.wait()
def asJson(self, param):
return json.dumps(param, separators=(',', ':'))
@gen.coroutine
def run(self):
while True:
try:
while True:
line = yield self.app.stdout.read_bytes(1024, partial=True)
line = line.decode('iso-8859-1')
payload = {
'command': 'output',
'data': line,
}
yield self.ws.write_message(self.asJson(payload))
except StreamClosedError:
self.rc = self.app.returncode
break
self.ioloop.stop()
payload = {
'command': 'finished',
'rc': self.rc
}
yield self.ws.write_message(self.asJson(payload))
def notify_internal(self):
url = self.args.complete_internal
url = url.replace('[rc]', str(self.rc))\
.replace('[ro]', self.args.readonly)\
.replace('[rw]', self.args.readwrite)
print('Notification: %s' % url)
page = urllib.request.urlopen(url)
ignored_content = page.read()
def keep_alive(self):
if self.ws is None:
self.connect()
# you choose the GUID number, or specify '.' for one to be generated for you
def make_parser():
parser = argparse.ArgumentParser(
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog=textwrap.dedent('''
additional information:
For information about GUIDs (aka UUID), please see
https://eggshell.pjy.us/guid. Alternatively, you may
specify a '.' and a random one will be generated for you.
callback urls can contain the following placeholders:
"[rc]" to include the process return code
"[ro]" the read-only GUID
"[rw]" the read/write GUID
'''))
parser.add_argument('-c', '--command', required = True,
help='the command to be executed. Enclose in quotes')
parser.add_argument('-ro', '--readonly', metavar='GUID',
help='a guid (or ".") for read-only access')
parser.add_argument('-rw', '--readwrite', metavar='GUID',
help='a guid (or ".") for read/write access')
parser.add_argument('-ci', '--complete-internal', metavar='URL',
help='url to load when finished. Enclose in quotes')
parser.add_argument('-o', '--overwrite', action='store_true',
help='overwrite any existing output')
return parser
if __name__ == "__main__":
parser = make_parser()
args = parser.parse_args()
if args.readonly is None and args.readwrite is None:
args.readonly = str(uuid.uuid4())
args.readwrite = ''
if args.readonly == '.':
args.readonly = str(uuid.uuid4())
if args.readonly is None:
args.readonly = ''
if args.readwrite == '.':
args.readwrite = str(uuid.uuid4())
if args.readwrite is None:
args.readwrite = ''
overwrite = 1 if args.overwrite else 0
safe_readonly = urllib.parse.quote(args.readonly)
safe_readwrite = urllib.parse.quote(args.readwrite)
client = Client(URL % (safe_readonly, safe_readwrite, overwrite), args)
if not args.complete_internal is None:
client.notify_internal()