Merge.
Due to my schizophrenia, I've accidentally forked my own code. The other set of changes were found in another computer.
import curses.ascii
import math
import logging
from tuikit.driver.driver import Driver
from tuikit.core.coords import Point
class CursesDriver(Driver):
key_codes = (
(0x09, 'tab' ),
(0x0a, 'enter' ),
(0x7f, 'backspace' ),
(0x1b, 'escape' ),
(0x1b, 0x4f, 0x50, 'f1' ), # xterm
(0x1b, 0x4f, 0x51, 'f2' ), # xterm
(0x1b, 0x4f, 0x52, 'f3' ), # xterm
(0x1b, 0x4f, 0x53, 'f4' ), # xterm
(0x1b, 0x5b, 'CSI' ), # see csi_codes
(0x1b, 0x5b, 0x4d, 'mouse' ),
)
# http://en.wikipedia.org/wiki/ANSI_escape_code
csi_codes = (
# code param key name
(0x7e, 1, 'home' ), # linux
(0x7e, 2, 'insert' ),
(0x7e, 3, 'delete' ),
(0x7e, 4, 'end' ), # linux
(0x7e, 5, 'pageup' ),
(0x7e, 6, 'pagedown' ),
(0x7e, 15, 'f5' ),
(0x7e, 17, 'f6' ),
(0x7e, 18, 'f7' ),
(0x7e, 19, 'f8' ),
(0x7e, 20, 'f9' ),
(0x7e, 21, 'f10' ),
(0x7e, 23, 'f11' ),
(0x7e, 24, 'f12' ),
(0x41, 1, 'up' ),
(0x42, 1, 'down' ),
(0x43, 1, 'right' ),
(0x44, 1, 'left' ),
(0x46, 1, 'end' ), # xterm
(0x48, 1, 'home' ), # xterm
(0x5a, 1, 'shift+tab' ), # xterm
(0x5b, 0x41, 1, 'f1' ), # linux
(0x5b, 0x42, 1, 'f2' ), # linux
(0x5b, 0x43, 1, 'f3' ), # linux
(0x5b, 0x44, 1, 'f4' ), # linux
(0x5b, 0x45, 1, 'f5' ), # linux
)
color_map = {
'default': (-1, 0),
'black': (curses.COLOR_BLACK, 0),
'blue': (curses.COLOR_BLUE, 0),
'green': (curses.COLOR_GREEN, 0),
'cyan': (curses.COLOR_CYAN, 0),
'red': (curses.COLOR_RED, 0),
'magenta': (curses.COLOR_MAGENTA, 0),
'brown': (curses.COLOR_YELLOW, 0),
'lightgray': (curses.COLOR_WHITE, 0),
'gray': (curses.COLOR_BLACK, curses.A_BOLD),
'lightblue': (curses.COLOR_BLUE, curses.A_BOLD),
'lightgreen': (curses.COLOR_GREEN, curses.A_BOLD),
'lightcyan': (curses.COLOR_CYAN, curses.A_BOLD),
'lightred': (curses.COLOR_RED, curses.A_BOLD),
'lightmagenta': (curses.COLOR_MAGENTA, curses.A_BOLD),
'yellow': (curses.COLOR_YELLOW, curses.A_BOLD),
'white': (curses.COLOR_WHITE, curses.A_BOLD),
}
attr_map = {
'bold': curses.A_BOLD,
'underline': curses.A_UNDERLINE,
'standout': curses.A_STANDOUT, # inverse bg/fg
'blink': curses.A_BLINK,
}
def __init__(self):
Driver.__init__(self)
self._log = logging.getLogger(__name__)
self.stdscr = None
self.cursor = None
self.colors = {} # maps names to curses attributes
self.colorpairs = {} # maps tuple (fg,bg) to curses color_pair
self.colorstack = [] # pushcolor/popcolor puts or gets attributes from this
self.inputqueue = []
self.mbtnstack = []
self._mouse_last_pos = None # Point
self._mouse_last_bstate = None
## initialization, finalization ##
def init(self):
"""Initialize curses"""
self.stdscr = curses.initscr()
curses.start_color()
curses.use_default_colors()
curses.noecho()
curses.cbreak()
self.stdscr.keypad(0)
self.stdscr.immedok(0)
self.size.h, self.size.w = self.stdscr.getmaxyx()
curses.curs_set(False) # hide cursor
curses.mousemask(curses.ALL_MOUSE_EVENTS | curses.REPORT_MOUSE_POSITION)
curses.mouseinterval(0) # do not wait to detect clicks, we use only press/release
def close(self):
self.stdscr.keypad(0)
curses.echo()
curses.nocbreak()
curses.endwin()
## drawing ##
def clear(self):
self.stdscr.erase()
def putch(self, ch, x, y):
try:
if isinstance(ch, int):
self.stdscr.addch(y, x, ch)
elif isinstance(ch, str) and len(ch) == 1:
self.stdscr.addstr(y, x, ch)
else:
raise TypeError('Integer or one-char string is required.')
except curses.error as e:
self._log.exception('putch(%r, %s, %s) error:' % (ch, x, y))
def draw(self, buffer, x=0, y=0):
for bufy in range(buffer.size.h):
for bufx in range(buffer.size.w):
char, attr_desc = buffer.get(bufx, bufy)
self.setattr(attr_desc)
self.putch(char, x + bufx, y + bufy)
def flush(self):
if self.cursor:
self.stdscr.move(self.cursor.y, self.cursor.x)
curses.curs_set(True)
else:
curses.curs_set(False)
self.stdscr.refresh()
## colors, attributes ##
def setattr(self, attr_desc):
"""Set attribute to be used for subsequent draw operations."""
attr = self.colors.get(attr_desc, None)
if attr is None:
# first encountered `attr_desc`, initialize
fg, bg, attrs = self._parse_attr_desc(attr_desc)
fgcol, fgattr = self.color_map[fg]
bgcol, _bgattr = self.color_map[bg]
colpair = self._getcolorpair(fgcol, bgcol)
attr = curses.color_pair(colpair) | self._parseattrs(attrs) | fgattr
self.colors[attr_desc] = attr
self.stdscr.attrset(attr)
def _getcolorpair(self, fgcol, bgcol):
pair = (fgcol, bgcol)
if pair in self.colorpairs:
return self.colorpairs[pair]
num = len(self.colorpairs) + 1
curses.init_pair(num, fgcol, bgcol)
self.colorpairs[pair] = num
return num
def _parseattrs(self, attrs):
res = 0
for a in attrs:
res = res | self.attr_map[a]
return res
## input, events ##
def getevents(self, timeout=None):
"""Process input, return list of events.
timeout -- float, in seconds (None=infinite)
Returns:
[('event', param1, ...), ...]
"""
# empty queue -> fill
if len(self.inputqueue) == 0:
if timeout is not None:
timeout = math.ceil(timeout * 10)
self._inputqueue_fill(timeout)
res = []
while len(self.inputqueue):
c = self._inputqueue_get()
if c == curses.KEY_MOUSE:
res += self._process_mouse()
elif c == curses.KEY_RESIZE:
self.size.h, self.size.w = self.stdscr.getmaxyx()
res.append(('resize', self.size.w, self.size.h))
elif curses.ascii.isctrl(c):
self._inputqueue_unget(c)
res += self._process_control_chars()
elif 192 <= c <= 255:
self._inputqueue_unget(c)
res += self._process_utf8_chars()
elif curses.ascii.isprint(c):
res += [('keypress', None, str(chr(c)), set())]
else:
self._inputqueue_unget(c)
res += self._process_control_chars()
return res
def _inputqueue_fill(self, timeout=None):
"""Wait for curses input, add it to inputqueue.
timeout -- int, tenths of second (None=infinite)
"""
if timeout is None:
# wait indefinitely
c = self.stdscr.getch()
self.inputqueue.insert(0, c)
elif timeout > 0:
# wait
curses.halfdelay(timeout)
c = self.stdscr.getch()
curses.cbreak()
if c == -1:
return
self.inputqueue.insert(0, c)
# timeout = 0 -> no wait
self.stdscr.nodelay(1)
while True:
c = self.stdscr.getch()
if c == -1:
break
self.inputqueue.insert(0, c)
self.stdscr.nodelay(0)
def _inputqueue_get(self):
c = None
try:
c = self.inputqueue.pop()
except IndexError:
pass
return c
def _inputqueue_get_wait(self):
c = None
while c is None:
try:
c = self.inputqueue.pop()
except IndexError:
curses.napms(25)
self._inputqueue_fill(0)
return c
def _inputqueue_unget(self, c):
self.inputqueue.append(c)
def _process_mouse(self):
try:
_id, x, y, _z, bstate = curses.getmouse()
except curses.error:
return []
pos = Point(x, y)
out = []
if bstate & curses.REPORT_MOUSE_POSITION:
if self._mouse_last_pos != pos:
if self._mouse_last_pos:
relpos = pos - self._mouse_last_pos
out += [('mousemove', 0, pos, relpos)]
self._mouse_last_pos = pos
# we are interested only in changes, not buttons already pressed before event
if self._mouse_last_bstate is not None:
old = self._mouse_last_bstate
new = bstate
bstate = ~old & new
self._mouse_last_bstate = new
else:
self._mouse_last_bstate = bstate
if bstate & curses.BUTTON1_PRESSED:
out += [('mousedown', 1, pos)]
if bstate & curses.BUTTON2_PRESSED:
out += [('mousedown', 2, pos)]
if bstate & curses.BUTTON3_PRESSED:
out += [('mousedown', 3, pos)]
if bstate & curses.BUTTON1_RELEASED:
out += [('mouseup', 1, pos)]
if bstate & curses.BUTTON2_RELEASED:
out += [('mouseup', 2, pos)]
if bstate & curses.BUTTON3_RELEASED:
out += [('mouseup', 3, pos)]
# reset last pos when pressed/released
if len(out) > 0 and out[-1][0] in ('mousedown', 'mouseup'):
self._mouse_last_pos = None
return out
def _process_utf8_chars(self):
#FIXME read exact number of chars as defined by utf-8
utf = []
while len(utf) <= 6:
c = self._inputqueue_get_wait()
utf.append(c)
try:
uni = str(bytes(utf), 'utf-8')
return [('keypress', None, uni, set())]
except UnicodeDecodeError:
continue
raise Exception('Invalid UTF-8 sequence: %r' % utf)
def _process_control_chars(self):
codes = self.key_codes
matchingcodes = []
match = None
consumed = []
# consume next char, filter out matching codes
c = self._inputqueue_get_wait()
consumed.append(c)
while True:
#self.log.debug('c=%s len=%s', c, len(codes))
for code in codes:
if c == code[len(consumed)-1]:
if len(code) - 1 == len(consumed):
match = code
else:
matchingcodes += [code]
#self.log.debug('matching=%s', len(matchingcodes))
# match found, or no matching code found -> stop
if len(matchingcodes) == 0:
break
# match found and some sequencies still match -> continue
if len(matchingcodes) > 0:
if len(self.inputqueue) == 0:
self._inputqueue_fill(1)
c = self._inputqueue_get()
if c:
consumed.append(c)
codes = matchingcodes
matchingcodes = []
else:
break
keyname = None
if match:
# compare match to consumed, return unused chars
l = len(match) - 1
while len(consumed) > l:
self._inputqueue_unget(consumed[-1])
del consumed[-1]
keyname = match[-1]
if match is None:
self._log.debug('Unknown control sequence: %s',
','.join(['0x%x' % x for x in consumed]))
return [('keypress', 'Unknown', None, set())]
if keyname == 'mouse':
return self._process_xterm_mouse()
if keyname == 'CSI':
return self._process_control_sequence()
return [('keypress', keyname, None, set())]
def _process_xterm_mouse(self):
t = self._inputqueue_get_wait()
x = self._inputqueue_get_wait() - 0x21
y = self._inputqueue_get_wait() - 0x21
pos = Point(x, y)
out = []
if t in (0x20, 0x21, 0x22):
# button press
btn = t - 0x1f
if not btn in self.mbtnstack:
self.mbtnstack.append(btn)
self._mouse_last_pos = None
out += [('mousedown', btn, pos)]
else:
# mouse move
if self._mouse_last_pos != pos:
if self._mouse_last_pos:
relpos = pos - self._mouse_last_pos
out += [('mousemove', btn, pos, relpos)]
self._mouse_last_pos = pos
elif t == 0x23:
# button release
btn = self.mbtnstack.pop()
self._mouse_last_pos = (None, None)
out += [('mouseup', btn, pos)]
elif t in (0x60, 0x61):
# wheel up, down
btn = 4 + t - 0x60
out += [('mousewheel', btn, pos)]
else:
raise Exception('Unknown mouse event: %x' % t)
return out
def _process_control_sequence(self):
codes = self.csi_codes
debug_seq = [0x1b, 0x5b]
c = self._inputqueue_get_wait()
debug_seq.append(c)
# numeric parameters?
params = []
if chr(c).isdigit():
params.append(chr(c))
while True:
c = self._inputqueue_get_wait()
debug_seq.append(c)
if chr(c).isdigit():
params[-1] += chr(c)
elif chr(c) == ';':
params.append('')
else:
break
params = [int(x) for x in params]
if len(params) == 0:
params = [1]
# filter codes using byte sequence
while True:
matching_codes = []
for code in codes:
if len(code) > 2 and code[0] == c:
matching_codes.append(code[1:])
codes = matching_codes
if len(codes) == 0:
# no match -> unknown code
seq = ','.join(['0x%x' % x for x in debug_seq])
self._log.debug('Unknown control sequence: %s', seq)
return [('keypress', 'Unknown:' + seq, None, set())]
elif len(codes) == 1:
# one match -> we got the winner
break
elif len(codes[0]) == 2:
# more than one matching, but no more chars to check
# will be sorted out using parameters
break
else:
# more than one matching -> continue loop
c = self._inputqueue_get_wait()
debug_seq.append(c)
# filter codes using first parameter
matching_codes = []
for code in codes:
if params[0] == code[0] or params[0] is None:
matching_codes.append(code)
if len(matching_codes) == 0:
# no match -> unknown code
seq = ','.join(['0x%x' % x for x in debug_seq])
self._log.debug('Unknown control sequence: %s', seq)
return [('keypress', 'Unknown:' + seq, None, set())]
if len(matching_codes) > 1:
raise Exception('Internal error: invalid csi_codes, more than one matching')
keyname = matching_codes[0][1]
# modifiers
mod_bits = 0
if len(params) > 1:
mod_bits = params[1] - 1
# convert modifiers from bit-map to set
mod_set = set()
for bit, name in enumerate(('shift', 'alt', 'ctrl', 'meta')):
if mod_bits & 1<<bit:
mod_set.add(name)
# parse keynames in form "shift+tab"
if '+' in keyname:
parts = keyname.split('+')
for mod in parts[:-1]:
assert(mod in ('shift', 'alt', 'ctrl', 'meta'))
mod_set.add(mod)
keyname = parts[-1]
return [('keypress', keyname, None, mod_set)]
driver_class = CursesDriver