diff -r 2bead23b1262 -r ebe732b9ef19 tuikit/driver/curses.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/tuikit/driver/curses.py Sat Mar 15 11:05:12 2014 +0100 @@ -0,0 +1,559 @@ +# -*- coding: utf-8 -*- + +import curses.ascii +import math +import logging + +from tuikit.driver.driver import Driver + + +class CursesDriver(Driver): + key_codes = ( + (0x09, 'tab' ), + (0x0a, 'enter' ), + (0x7f, 'backspace' ), + (0x1b, 'escape' ), + (0x1b,0x4f,0x50, 'f1' ), # xterm + (0x1b,0x4f,0x51, 'f2' ), # xterm + (0x1b,0x4f,0x52, 'f3' ), # xterm + (0x1b,0x4f,0x53, 'f4' ), # xterm + (0x1b,0x5b, 'CSI' ), # see csi_codes + (0x1b,0x5b,0x4d, 'mouse' ), + ) + + # http://en.wikipedia.org/wiki/ANSI_escape_code + csi_codes = ( + # code param key name + (0x7e, 1, 'home' ), # linux + (0x7e, 2, 'insert' ), + (0x7e, 3, 'delete' ), + (0x7e, 4, 'end' ), # linux + (0x7e, 5, 'pageup' ), + (0x7e, 6, 'pagedown' ), + (0x7e, 15, 'f5' ), + (0x7e, 17, 'f6' ), + (0x7e, 18, 'f7' ), + (0x7e, 19, 'f8' ), + (0x7e, 20, 'f9' ), + (0x7e, 21, 'f10' ), + (0x7e, 23, 'f11' ), + (0x7e, 24, 'f12' ), + (0x41, 1, 'up' ), + (0x42, 1, 'down' ), + (0x43, 1, 'right' ), + (0x44, 1, 'left' ), + (0x46, 1, 'end' ), # xterm + (0x48, 1, 'home' ), # xterm + (0x5b,0x41, 1, 'f1' ), # linux + (0x5b,0x42, 1, 'f2' ), # linux + (0x5b,0x43, 1, 'f3' ), # linux + (0x5b,0x44, 1, 'f4' ), # linux + (0x5b,0x45, 1, 'f5' ), # linux + ) + + color_map = { + 'black' : (curses.COLOR_BLACK, 0), + 'blue' : (curses.COLOR_BLUE, 0), + 'green' : (curses.COLOR_GREEN, 0), + 'cyan' : (curses.COLOR_CYAN, 0), + 'red' : (curses.COLOR_RED, 0), + 'magenta' : (curses.COLOR_MAGENTA,0), + 'brown' : (curses.COLOR_YELLOW, 0), + 'lightgray' : (curses.COLOR_WHITE, 0), + 'gray' : (curses.COLOR_BLACK, curses.A_BOLD), + 'lightblue' : (curses.COLOR_BLUE, curses.A_BOLD), + 'lightgreen' : (curses.COLOR_GREEN, curses.A_BOLD), + 'lightcyan' : (curses.COLOR_CYAN, curses.A_BOLD), + 'lightred' : (curses.COLOR_RED, curses.A_BOLD), + 'lightmagenta' : (curses.COLOR_MAGENTA,curses.A_BOLD), + 'yellow' : (curses.COLOR_YELLOW, curses.A_BOLD), + 'white' : (curses.COLOR_WHITE, curses.A_BOLD), + } + + attr_map = { + 'bold' : curses.A_BOLD, + 'underline' : curses.A_UNDERLINE, + 'standout' : curses.A_STANDOUT, # inverse bg/fg + 'blink' : curses.A_BLINK, + } + + def __init__(self): + '''Set driver attributes to default values.''' + Driver.__init__(self) + self.log = logging.getLogger('tuikit') + self.stdscr = None + self.cursor = None + self.colors = {} # maps names to curses attributes + self.colorpairs = {} # maps tuple (fg,bg) to curses color_pair + self.colorstack = [] # pushcolor/popcolor puts or gets attributes from this + self.inputqueue = [] + self.mbtnstack = [] + self._mouse_last_pos = (None, None) + self._mouse_last_bstate = None + + ## initialization, finalization ## + + def init(self): + """Initialize curses""" + self.stdscr = curses.initscr() + curses.start_color() + curses.noecho() + curses.cbreak() + self.stdscr.keypad(0) + self.stdscr.immedok(0) + + self.size.h, self.size.w = self.stdscr.getmaxyx() + + curses.curs_set(False) # hide cursor + curses.mousemask(curses.ALL_MOUSE_EVENTS | curses.REPORT_MOUSE_POSITION) + curses.mouseinterval(0) # do not wait to detect clicks, we use only press/release + + def close(self): + self.stdscr.keypad(0) + curses.echo() + curses.nocbreak() + curses.endwin() + + def __enter__(self): + self.init() + + def __exit__(self, exc_type, exc_val, exc_tb): + self.close() + + ## drawing ## + + def draw(self, buffer, x=0, y=0): + for bufy in range(buffer.size.h): + for bufx in range(buffer.size.w): + self.putch(x + bufx, y + bufy, + buffer.get(bufx, bufy)[0]) + + + ## colors, attributes ## + + def _parsecolor(self, name): + name = name.lower().strip() + return self.color_map[name] + + def _getcolorpair(self, fg, bg): + pair = (fg, bg) + if pair in self.colorpairs: + return self.colorpairs[pair] + num = len(self.colorpairs) + 1 + curses.init_pair(num, fg, bg) + self.colorpairs[pair] = num + return num + + def _parseattrs(self, attrs): + res = 0 + for a in attrs: + a = a.lower().strip() + res = res | self.attr_map[a] + return res + + def defcolor(self, name, desc): + """Define color name.""" + parts = desc.split(',') + fgbg = parts[0].split(' on ', 1) + fg = fgbg[0] + bg = fgbg[1:] and fgbg[1] or 'black' + attrs = parts[1:] + fg, fgattr = self._parsecolor(fg) + bg, _bgattr = self._parsecolor(bg) + col = self._getcolorpair(fg, bg) + attr = self._parseattrs(attrs) + self.colors[name] = curses.color_pair(col) | fgattr | attr + + def setcolor(self, name): + """Set defined color. Previous color is forgotten.""" + self.stdscr.attrset(self.colors[name]) + + def pushcolor(self, name): + # add prefix if such color is available + if len(self.colorprefix): + prefixname = self.colorprefix[-1] + name + if prefixname in self.colors: + name = prefixname + attr = self.colors[name] + self.stdscr.attrset(attr) + self.colorstack.append(attr) + + def popcolor(self): + self.colorstack.pop() + if len(self.colorstack): + attr = self.colorstack[-1] + else: + attr = 0 + self.stdscr.attrset(attr) + + + ## drawing ## + + def putch(self, x, y, c): + if not self.clipstack.test(x, y): + return + try: + if isinstance(c, str) and len(c) == 1: + self.stdscr.addstr(y, x, c) + else: + self.stdscr.addch(y, x, c) + except curses.error: + pass + + def erase(self): + self.stdscr.erase() + + def commit(self): + if self.cursor: + self.stdscr.move(*self.cursor) + curses.curs_set(True) + else: + curses.curs_set(False) + self.stdscr.refresh() + + + ## cursor ## + + def showcursor(self, x, y): + if self.clipstack.test(x, y): + self.cursor = (y, x) + else: + self.cursor = None + + def hidecursor(self): + curses.curs_set(False) + self.cursor = None + + + ## input ## + + def inputqueue_fill(self, timeout=None): + """Wait for curses input, add it to inputqueue. + + timeout -- int, tenths of second (None=infinite) + + """ + if timeout is None: + # wait indefinitely + c = self.stdscr.getch() + self.inputqueue.insert(0, c) + + elif timeout > 0: + # wait + curses.halfdelay(timeout) + c = self.stdscr.getch() + curses.cbreak() + if c == -1: + return + self.inputqueue.insert(0, c) + + # timeout = 0 -> no wait + + self.stdscr.nodelay(1) + + while True: + c = self.stdscr.getch() + if c == -1: + break + self.inputqueue.insert(0, c) + + self.stdscr.nodelay(0) + + + def inputqueue_top(self, num=0): + return self.inputqueue[-1-num] + + + def inputqueue_get(self): + c = None + try: + c = self.inputqueue.pop() + except IndexError: + pass + return c + + + def inputqueue_get_wait(self): + c = None + while c is None: + try: + c = self.inputqueue.pop() + except IndexError: + curses.napms(25) + self.inputqueue_fill(0) + return c + + + def inputqueue_unget(self, c): + self.inputqueue.append(c) + + + def getevents(self, timeout=None): + '''Process input, return list of events. + + timeout -- float, in seconds + + ''' + # empty queue -> fill + if len(self.inputqueue) == 0: + if timeout is not None: + timeout = math.ceil(timeout * 10) + self.inputqueue_fill(timeout) + + res = [] + while len(self.inputqueue): + c = self.inputqueue_get() + + if c == curses.KEY_MOUSE: + res += self.process_mouse() + + elif c == curses.KEY_RESIZE: + self.size.h, self.size.w = self.stdscr.getmaxyx() + res.append(('resize',)) + + elif curses.ascii.isctrl(c): + self.inputqueue_unget(c) + res += self.process_control_chars() + + elif c >= 192 and c <= 255: + self.inputqueue_unget(c) + res += self.process_utf8_chars() + + elif curses.ascii.isprint(c): + res += [('keypress', None, str(chr(c)))] + + else: + self.inputqueue_unget(c) + res += self.process_control_chars() + + return res + + + def process_mouse(self): + try: + _id, x, y, _z, bstate = curses.getmouse() + except curses.error: + return [] + + out = [] + + if bstate & curses.REPORT_MOUSE_POSITION: + if self._mouse_last_pos != (x, y): + if self._mouse_last_pos[0] is not None: + relx = x - (self._mouse_last_pos[0] or 0) + rely = y - (self._mouse_last_pos[1] or 0) + out += [('mousemove', 0, x, y, relx, rely)] + self._mouse_last_pos = (x, y) + + # we are interested only in changes, not buttons already pressed before event + if self._mouse_last_bstate is not None: + old = self._mouse_last_bstate + new = bstate + bstate = ~old & new + self._mouse_last_bstate = new + else: + self._mouse_last_bstate = bstate + + if bstate & curses.BUTTON1_PRESSED: + out += [('mousedown', 1, x, y)] + if bstate & curses.BUTTON2_PRESSED: + out += [('mousedown', 2, x, y)] + if bstate & curses.BUTTON3_PRESSED: + out += [('mousedown', 3, x, y)] + if bstate & curses.BUTTON1_RELEASED: + out += [('mouseup', 1, x, y)] + if bstate & curses.BUTTON2_RELEASED: + out += [('mouseup', 2, x, y)] + if bstate & curses.BUTTON3_RELEASED: + out += [('mouseup', 3, x, y)] + + # reset last pos when pressed/released + if len(out) > 0 and out[-1][0] in ('mousedown', 'mouseup'): + self._mouse_last_pos = (None, None) + + return out + + + def process_utf8_chars(self): + #FIXME read exact number of chars as defined by utf-8 + utf = [] + while len(utf) <= 6: + c = self.inputqueue_get_wait() + utf.append(c) + try: + uni = str(bytes(utf), 'utf-8') + return [('keypress', None, uni)] + except UnicodeDecodeError: + continue + raise Exception('Invalid UTF-8 sequence: %r' % utf) + + + def process_control_chars(self): + codes = self.key_codes + matchingcodes = [] + match = None + consumed = [] + + # consume next char, filter out matching codes + c = self.inputqueue_get_wait() + consumed.append(c) + + while True: + #self.log.debug('c=%s len=%s', c, len(codes)) + for code in codes: + if c == code[len(consumed)-1]: + if len(code) - 1 == len(consumed): + match = code + else: + matchingcodes += [code] + + #self.log.debug('matching=%s', len(matchingcodes)) + + # match found, or no matching code found -> stop + if len(matchingcodes) == 0: + break + + # match found and some sequencies still match -> continue + if len(matchingcodes) > 0: + if len(self.inputqueue) == 0: + self.inputqueue_fill(1) + + c = self.inputqueue_get() + if c: + consumed.append(c) + codes = matchingcodes + matchingcodes = [] + else: + break + + keyname = None + if match: + # compare match to consumed, return unused chars + l = len(match) - 1 + while len(consumed) > l: + self.inputqueue_unget(consumed[-1]) + del consumed[-1] + keyname = match[-1] + + if match is None: + self.log.debug('Unknown control sequence: %s', + ','.join(['0x%x'%x for x in consumed])) + return [('keypress', 'Unknown', None)] + + if keyname == 'mouse': + return self.process_xterm_mouse() + + if keyname == 'CSI': + return self.process_control_sequence() + + return [('keypress', keyname, None)] + + + def process_xterm_mouse(self): + t = self.inputqueue_get_wait() + x = self.inputqueue_get_wait() - 0x21 + y = self.inputqueue_get_wait() - 0x21 + + out = [] + + if t in (0x20, 0x21, 0x22): + # button press + btn = t - 0x1f + if not btn in self.mbtnstack: + self.mbtnstack.append(btn) + self._mouse_last_pos = (None, None) + out += [('mousedown', btn, x, y)] + else: + # mouse move + if self._mouse_last_pos != (x, y): + if self._mouse_last_pos[0] is not None: + relx = x - self._mouse_last_pos[0] + rely = y - self._mouse_last_pos[1] + out += [('mousemove', btn, x, y, relx, rely)] + self._mouse_last_pos = (x, y) + elif t == 0x23: + # button release + btn = self.mbtnstack.pop() + self._mouse_last_pos = (None, None) + out += [('mouseup', btn, x, y)] + elif t in (0x60, 0x61): + # wheel up, down + btn = 4 + t - 0x60 + out += [('mousewheel', btn, x, y)] + else: + raise Exception('Unknown mouse event: %x' % t) + + return out + + def process_control_sequence(self): + codes = self.csi_codes + debug_seq = [0x1b, 0x5b] + c = self.inputqueue_get_wait() + debug_seq.append(c) + + # numeric parameters? + params = [] + if chr(c).isdigit(): + params.append(chr(c)) + while True: + c = self.inputqueue_get_wait() + debug_seq.append(c) + if chr(c).isdigit(): + params[-1] += chr(c) + elif chr(c) == ';': + params.append('') + else: + break + params = [int(x) for x in params] + if len(params) == 0: + params = [1] + + # filter codes using byte sequence + while True: + matching_codes = [] + for code in codes: + if len(code) > 2 and code[0] == c: + matching_codes.append(code[1:]) + codes = matching_codes + + if len(codes) == 0: + # no match -> unknown code + seq = ','.join(['0x%x' % x for x in debug_seq]) + self.log.debug('Unknown control sequence: %s', seq) + return [('keypress', 'Unknown:' + seq, None)] + elif len(codes) == 1: + # one match -> we got the winner + break + elif len(codes[0]) == 2: + # more than one matching, but no more chars to check + # will be sorted out using parameters + break + else: + # more than one matching -> continue loop + c = self.inputqueue_get_wait() + debug_seq.append(c) + + # filter codes using first parameter + matching_codes = [] + for code in codes: + if params[0] == code[0] or params[0] is None: + matching_codes.append(code) + + if len(matching_codes) == 0: + # no match -> unknown code + seq = ','.join(['0x%x' % x for x in debug_seq]) + self.log.debug('Unknown control sequence: %s', seq) + return [('keypress', 'Unknown:' + seq, None)] + + if len(matching_codes) > 1: + raise Exception('Internal error: invalid csi_codes, more than one matching') + + keyname = matching_codes[0][1] + + # modifiers + mod = 0 + if len(params) > 1: + mod = params[1] - 1 + + return [('keypress', keyname, None, mod)] +