--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/tuikit/driver/curses.py Sat Mar 15 11:05:12 2014 +0100
@@ -0,0 +1,559 @@
+# -*- coding: utf-8 -*-
+
+import curses.ascii
+import math
+import logging
+
+from tuikit.driver.driver import Driver
+
+
+class CursesDriver(Driver):
+ key_codes = (
+ (0x09, 'tab' ),
+ (0x0a, 'enter' ),
+ (0x7f, 'backspace' ),
+ (0x1b, 'escape' ),
+ (0x1b,0x4f,0x50, 'f1' ), # xterm
+ (0x1b,0x4f,0x51, 'f2' ), # xterm
+ (0x1b,0x4f,0x52, 'f3' ), # xterm
+ (0x1b,0x4f,0x53, 'f4' ), # xterm
+ (0x1b,0x5b, 'CSI' ), # see csi_codes
+ (0x1b,0x5b,0x4d, 'mouse' ),
+ )
+
+ # http://en.wikipedia.org/wiki/ANSI_escape_code
+ csi_codes = (
+ # code param key name
+ (0x7e, 1, 'home' ), # linux
+ (0x7e, 2, 'insert' ),
+ (0x7e, 3, 'delete' ),
+ (0x7e, 4, 'end' ), # linux
+ (0x7e, 5, 'pageup' ),
+ (0x7e, 6, 'pagedown' ),
+ (0x7e, 15, 'f5' ),
+ (0x7e, 17, 'f6' ),
+ (0x7e, 18, 'f7' ),
+ (0x7e, 19, 'f8' ),
+ (0x7e, 20, 'f9' ),
+ (0x7e, 21, 'f10' ),
+ (0x7e, 23, 'f11' ),
+ (0x7e, 24, 'f12' ),
+ (0x41, 1, 'up' ),
+ (0x42, 1, 'down' ),
+ (0x43, 1, 'right' ),
+ (0x44, 1, 'left' ),
+ (0x46, 1, 'end' ), # xterm
+ (0x48, 1, 'home' ), # xterm
+ (0x5b,0x41, 1, 'f1' ), # linux
+ (0x5b,0x42, 1, 'f2' ), # linux
+ (0x5b,0x43, 1, 'f3' ), # linux
+ (0x5b,0x44, 1, 'f4' ), # linux
+ (0x5b,0x45, 1, 'f5' ), # linux
+ )
+
+ color_map = {
+ 'black' : (curses.COLOR_BLACK, 0),
+ 'blue' : (curses.COLOR_BLUE, 0),
+ 'green' : (curses.COLOR_GREEN, 0),
+ 'cyan' : (curses.COLOR_CYAN, 0),
+ 'red' : (curses.COLOR_RED, 0),
+ 'magenta' : (curses.COLOR_MAGENTA,0),
+ 'brown' : (curses.COLOR_YELLOW, 0),
+ 'lightgray' : (curses.COLOR_WHITE, 0),
+ 'gray' : (curses.COLOR_BLACK, curses.A_BOLD),
+ 'lightblue' : (curses.COLOR_BLUE, curses.A_BOLD),
+ 'lightgreen' : (curses.COLOR_GREEN, curses.A_BOLD),
+ 'lightcyan' : (curses.COLOR_CYAN, curses.A_BOLD),
+ 'lightred' : (curses.COLOR_RED, curses.A_BOLD),
+ 'lightmagenta' : (curses.COLOR_MAGENTA,curses.A_BOLD),
+ 'yellow' : (curses.COLOR_YELLOW, curses.A_BOLD),
+ 'white' : (curses.COLOR_WHITE, curses.A_BOLD),
+ }
+
+ attr_map = {
+ 'bold' : curses.A_BOLD,
+ 'underline' : curses.A_UNDERLINE,
+ 'standout' : curses.A_STANDOUT, # inverse bg/fg
+ 'blink' : curses.A_BLINK,
+ }
+
+ def __init__(self):
+ '''Set driver attributes to default values.'''
+ Driver.__init__(self)
+ self.log = logging.getLogger('tuikit')
+ self.stdscr = None
+ self.cursor = None
+ self.colors = {} # maps names to curses attributes
+ self.colorpairs = {} # maps tuple (fg,bg) to curses color_pair
+ self.colorstack = [] # pushcolor/popcolor puts or gets attributes from this
+ self.inputqueue = []
+ self.mbtnstack = []
+ self._mouse_last_pos = (None, None)
+ self._mouse_last_bstate = None
+
+ ## initialization, finalization ##
+
+ def init(self):
+ """Initialize curses"""
+ self.stdscr = curses.initscr()
+ curses.start_color()
+ curses.noecho()
+ curses.cbreak()
+ self.stdscr.keypad(0)
+ self.stdscr.immedok(0)
+
+ self.size.h, self.size.w = self.stdscr.getmaxyx()
+
+ curses.curs_set(False) # hide cursor
+ curses.mousemask(curses.ALL_MOUSE_EVENTS | curses.REPORT_MOUSE_POSITION)
+ curses.mouseinterval(0) # do not wait to detect clicks, we use only press/release
+
+ def close(self):
+ self.stdscr.keypad(0)
+ curses.echo()
+ curses.nocbreak()
+ curses.endwin()
+
+ def __enter__(self):
+ self.init()
+
+ def __exit__(self, exc_type, exc_val, exc_tb):
+ self.close()
+
+ ## drawing ##
+
+ def draw(self, buffer, x=0, y=0):
+ for bufy in range(buffer.size.h):
+ for bufx in range(buffer.size.w):
+ self.putch(x + bufx, y + bufy,
+ buffer.get(bufx, bufy)[0])
+
+
+ ## colors, attributes ##
+
+ def _parsecolor(self, name):
+ name = name.lower().strip()
+ return self.color_map[name]
+
+ def _getcolorpair(self, fg, bg):
+ pair = (fg, bg)
+ if pair in self.colorpairs:
+ return self.colorpairs[pair]
+ num = len(self.colorpairs) + 1
+ curses.init_pair(num, fg, bg)
+ self.colorpairs[pair] = num
+ return num
+
+ def _parseattrs(self, attrs):
+ res = 0
+ for a in attrs:
+ a = a.lower().strip()
+ res = res | self.attr_map[a]
+ return res
+
+ def defcolor(self, name, desc):
+ """Define color name."""
+ parts = desc.split(',')
+ fgbg = parts[0].split(' on ', 1)
+ fg = fgbg[0]
+ bg = fgbg[1:] and fgbg[1] or 'black'
+ attrs = parts[1:]
+ fg, fgattr = self._parsecolor(fg)
+ bg, _bgattr = self._parsecolor(bg)
+ col = self._getcolorpair(fg, bg)
+ attr = self._parseattrs(attrs)
+ self.colors[name] = curses.color_pair(col) | fgattr | attr
+
+ def setcolor(self, name):
+ """Set defined color. Previous color is forgotten."""
+ self.stdscr.attrset(self.colors[name])
+
+ def pushcolor(self, name):
+ # add prefix if such color is available
+ if len(self.colorprefix):
+ prefixname = self.colorprefix[-1] + name
+ if prefixname in self.colors:
+ name = prefixname
+ attr = self.colors[name]
+ self.stdscr.attrset(attr)
+ self.colorstack.append(attr)
+
+ def popcolor(self):
+ self.colorstack.pop()
+ if len(self.colorstack):
+ attr = self.colorstack[-1]
+ else:
+ attr = 0
+ self.stdscr.attrset(attr)
+
+
+ ## drawing ##
+
+ def putch(self, x, y, c):
+ if not self.clipstack.test(x, y):
+ return
+ try:
+ if isinstance(c, str) and len(c) == 1:
+ self.stdscr.addstr(y, x, c)
+ else:
+ self.stdscr.addch(y, x, c)
+ except curses.error:
+ pass
+
+ def erase(self):
+ self.stdscr.erase()
+
+ def commit(self):
+ if self.cursor:
+ self.stdscr.move(*self.cursor)
+ curses.curs_set(True)
+ else:
+ curses.curs_set(False)
+ self.stdscr.refresh()
+
+
+ ## cursor ##
+
+ def showcursor(self, x, y):
+ if self.clipstack.test(x, y):
+ self.cursor = (y, x)
+ else:
+ self.cursor = None
+
+ def hidecursor(self):
+ curses.curs_set(False)
+ self.cursor = None
+
+
+ ## input ##
+
+ def inputqueue_fill(self, timeout=None):
+ """Wait for curses input, add it to inputqueue.
+
+ timeout -- int, tenths of second (None=infinite)
+
+ """
+ if timeout is None:
+ # wait indefinitely
+ c = self.stdscr.getch()
+ self.inputqueue.insert(0, c)
+
+ elif timeout > 0:
+ # wait
+ curses.halfdelay(timeout)
+ c = self.stdscr.getch()
+ curses.cbreak()
+ if c == -1:
+ return
+ self.inputqueue.insert(0, c)
+
+ # timeout = 0 -> no wait
+
+ self.stdscr.nodelay(1)
+
+ while True:
+ c = self.stdscr.getch()
+ if c == -1:
+ break
+ self.inputqueue.insert(0, c)
+
+ self.stdscr.nodelay(0)
+
+
+ def inputqueue_top(self, num=0):
+ return self.inputqueue[-1-num]
+
+
+ def inputqueue_get(self):
+ c = None
+ try:
+ c = self.inputqueue.pop()
+ except IndexError:
+ pass
+ return c
+
+
+ def inputqueue_get_wait(self):
+ c = None
+ while c is None:
+ try:
+ c = self.inputqueue.pop()
+ except IndexError:
+ curses.napms(25)
+ self.inputqueue_fill(0)
+ return c
+
+
+ def inputqueue_unget(self, c):
+ self.inputqueue.append(c)
+
+
+ def getevents(self, timeout=None):
+ '''Process input, return list of events.
+
+ timeout -- float, in seconds
+
+ '''
+ # empty queue -> fill
+ if len(self.inputqueue) == 0:
+ if timeout is not None:
+ timeout = math.ceil(timeout * 10)
+ self.inputqueue_fill(timeout)
+
+ res = []
+ while len(self.inputqueue):
+ c = self.inputqueue_get()
+
+ if c == curses.KEY_MOUSE:
+ res += self.process_mouse()
+
+ elif c == curses.KEY_RESIZE:
+ self.size.h, self.size.w = self.stdscr.getmaxyx()
+ res.append(('resize',))
+
+ elif curses.ascii.isctrl(c):
+ self.inputqueue_unget(c)
+ res += self.process_control_chars()
+
+ elif c >= 192 and c <= 255:
+ self.inputqueue_unget(c)
+ res += self.process_utf8_chars()
+
+ elif curses.ascii.isprint(c):
+ res += [('keypress', None, str(chr(c)))]
+
+ else:
+ self.inputqueue_unget(c)
+ res += self.process_control_chars()
+
+ return res
+
+
+ def process_mouse(self):
+ try:
+ _id, x, y, _z, bstate = curses.getmouse()
+ except curses.error:
+ return []
+
+ out = []
+
+ if bstate & curses.REPORT_MOUSE_POSITION:
+ if self._mouse_last_pos != (x, y):
+ if self._mouse_last_pos[0] is not None:
+ relx = x - (self._mouse_last_pos[0] or 0)
+ rely = y - (self._mouse_last_pos[1] or 0)
+ out += [('mousemove', 0, x, y, relx, rely)]
+ self._mouse_last_pos = (x, y)
+
+ # we are interested only in changes, not buttons already pressed before event
+ if self._mouse_last_bstate is not None:
+ old = self._mouse_last_bstate
+ new = bstate
+ bstate = ~old & new
+ self._mouse_last_bstate = new
+ else:
+ self._mouse_last_bstate = bstate
+
+ if bstate & curses.BUTTON1_PRESSED:
+ out += [('mousedown', 1, x, y)]
+ if bstate & curses.BUTTON2_PRESSED:
+ out += [('mousedown', 2, x, y)]
+ if bstate & curses.BUTTON3_PRESSED:
+ out += [('mousedown', 3, x, y)]
+ if bstate & curses.BUTTON1_RELEASED:
+ out += [('mouseup', 1, x, y)]
+ if bstate & curses.BUTTON2_RELEASED:
+ out += [('mouseup', 2, x, y)]
+ if bstate & curses.BUTTON3_RELEASED:
+ out += [('mouseup', 3, x, y)]
+
+ # reset last pos when pressed/released
+ if len(out) > 0 and out[-1][0] in ('mousedown', 'mouseup'):
+ self._mouse_last_pos = (None, None)
+
+ return out
+
+
+ def process_utf8_chars(self):
+ #FIXME read exact number of chars as defined by utf-8
+ utf = []
+ while len(utf) <= 6:
+ c = self.inputqueue_get_wait()
+ utf.append(c)
+ try:
+ uni = str(bytes(utf), 'utf-8')
+ return [('keypress', None, uni)]
+ except UnicodeDecodeError:
+ continue
+ raise Exception('Invalid UTF-8 sequence: %r' % utf)
+
+
+ def process_control_chars(self):
+ codes = self.key_codes
+ matchingcodes = []
+ match = None
+ consumed = []
+
+ # consume next char, filter out matching codes
+ c = self.inputqueue_get_wait()
+ consumed.append(c)
+
+ while True:
+ #self.log.debug('c=%s len=%s', c, len(codes))
+ for code in codes:
+ if c == code[len(consumed)-1]:
+ if len(code) - 1 == len(consumed):
+ match = code
+ else:
+ matchingcodes += [code]
+
+ #self.log.debug('matching=%s', len(matchingcodes))
+
+ # match found, or no matching code found -> stop
+ if len(matchingcodes) == 0:
+ break
+
+ # match found and some sequencies still match -> continue
+ if len(matchingcodes) > 0:
+ if len(self.inputqueue) == 0:
+ self.inputqueue_fill(1)
+
+ c = self.inputqueue_get()
+ if c:
+ consumed.append(c)
+ codes = matchingcodes
+ matchingcodes = []
+ else:
+ break
+
+ keyname = None
+ if match:
+ # compare match to consumed, return unused chars
+ l = len(match) - 1
+ while len(consumed) > l:
+ self.inputqueue_unget(consumed[-1])
+ del consumed[-1]
+ keyname = match[-1]
+
+ if match is None:
+ self.log.debug('Unknown control sequence: %s',
+ ','.join(['0x%x'%x for x in consumed]))
+ return [('keypress', 'Unknown', None)]
+
+ if keyname == 'mouse':
+ return self.process_xterm_mouse()
+
+ if keyname == 'CSI':
+ return self.process_control_sequence()
+
+ return [('keypress', keyname, None)]
+
+
+ def process_xterm_mouse(self):
+ t = self.inputqueue_get_wait()
+ x = self.inputqueue_get_wait() - 0x21
+ y = self.inputqueue_get_wait() - 0x21
+
+ out = []
+
+ if t in (0x20, 0x21, 0x22):
+ # button press
+ btn = t - 0x1f
+ if not btn in self.mbtnstack:
+ self.mbtnstack.append(btn)
+ self._mouse_last_pos = (None, None)
+ out += [('mousedown', btn, x, y)]
+ else:
+ # mouse move
+ if self._mouse_last_pos != (x, y):
+ if self._mouse_last_pos[0] is not None:
+ relx = x - self._mouse_last_pos[0]
+ rely = y - self._mouse_last_pos[1]
+ out += [('mousemove', btn, x, y, relx, rely)]
+ self._mouse_last_pos = (x, y)
+ elif t == 0x23:
+ # button release
+ btn = self.mbtnstack.pop()
+ self._mouse_last_pos = (None, None)
+ out += [('mouseup', btn, x, y)]
+ elif t in (0x60, 0x61):
+ # wheel up, down
+ btn = 4 + t - 0x60
+ out += [('mousewheel', btn, x, y)]
+ else:
+ raise Exception('Unknown mouse event: %x' % t)
+
+ return out
+
+ def process_control_sequence(self):
+ codes = self.csi_codes
+ debug_seq = [0x1b, 0x5b]
+ c = self.inputqueue_get_wait()
+ debug_seq.append(c)
+
+ # numeric parameters?
+ params = []
+ if chr(c).isdigit():
+ params.append(chr(c))
+ while True:
+ c = self.inputqueue_get_wait()
+ debug_seq.append(c)
+ if chr(c).isdigit():
+ params[-1] += chr(c)
+ elif chr(c) == ';':
+ params.append('')
+ else:
+ break
+ params = [int(x) for x in params]
+ if len(params) == 0:
+ params = [1]
+
+ # filter codes using byte sequence
+ while True:
+ matching_codes = []
+ for code in codes:
+ if len(code) > 2 and code[0] == c:
+ matching_codes.append(code[1:])
+ codes = matching_codes
+
+ if len(codes) == 0:
+ # no match -> unknown code
+ seq = ','.join(['0x%x' % x for x in debug_seq])
+ self.log.debug('Unknown control sequence: %s', seq)
+ return [('keypress', 'Unknown:' + seq, None)]
+ elif len(codes) == 1:
+ # one match -> we got the winner
+ break
+ elif len(codes[0]) == 2:
+ # more than one matching, but no more chars to check
+ # will be sorted out using parameters
+ break
+ else:
+ # more than one matching -> continue loop
+ c = self.inputqueue_get_wait()
+ debug_seq.append(c)
+
+ # filter codes using first parameter
+ matching_codes = []
+ for code in codes:
+ if params[0] == code[0] or params[0] is None:
+ matching_codes.append(code)
+
+ if len(matching_codes) == 0:
+ # no match -> unknown code
+ seq = ','.join(['0x%x' % x for x in debug_seq])
+ self.log.debug('Unknown control sequence: %s', seq)
+ return [('keypress', 'Unknown:' + seq, None)]
+
+ if len(matching_codes) > 1:
+ raise Exception('Internal error: invalid csi_codes, more than one matching')
+
+ keyname = matching_codes[0][1]
+
+ # modifiers
+ mod = 0
+ if len(params) > 1:
+ mod = params[1] - 1
+
+ return [('keypress', keyname, None, mod)]
+