tuikit/driver/curses.py
author Radek Brich <radek.brich@devl.cz>
Wed, 03 Sep 2014 21:56:20 +0200
changeset 113 6796adfdc7eb
parent 109 105b1affc3c2
parent 106 abcadb7e2ef1
permissions -rw-r--r--
Merge. Due to my schizophrenia, I've accidentally forked my own code. The other set of changes were found in another computer.

import curses.ascii
import math
import logging

from tuikit.driver.driver import Driver
from tuikit.core.coords import Point


class CursesDriver(Driver):

    key_codes = (
        (0x09,                      'tab'           ),
        (0x0a,                      'enter'         ),
        (0x7f,                      'backspace'     ),
        (0x1b,                      'escape'        ),
        (0x1b, 0x4f, 0x50,          'f1'            ),  # xterm
        (0x1b, 0x4f, 0x51,          'f2'            ),  # xterm
        (0x1b, 0x4f, 0x52,          'f3'            ),  # xterm
        (0x1b, 0x4f, 0x53,          'f4'            ),  # xterm
        (0x1b, 0x5b,                'CSI'           ),  # see csi_codes
        (0x1b, 0x5b, 0x4d,          'mouse'         ),
    )

    # http://en.wikipedia.org/wiki/ANSI_escape_code
    csi_codes = (
        # code            param     key name
        (0x7e,              1,      'home'          ),  # linux
        (0x7e,              2,      'insert'        ),
        (0x7e,              3,      'delete'        ),
        (0x7e,              4,      'end'           ),  # linux
        (0x7e,              5,      'pageup'        ),
        (0x7e,              6,      'pagedown'      ),
        (0x7e,              15,     'f5'            ),
        (0x7e,              17,     'f6'            ),
        (0x7e,              18,     'f7'            ),
        (0x7e,              19,     'f8'            ),
        (0x7e,              20,     'f9'            ),
        (0x7e,              21,     'f10'           ),
        (0x7e,              23,     'f11'           ),
        (0x7e,              24,     'f12'           ),
        (0x41,              1,      'up'            ),
        (0x42,              1,      'down'          ),
        (0x43,              1,      'right'         ),
        (0x44,              1,      'left'          ),
        (0x46,              1,      'end'           ),  # xterm
        (0x48,              1,      'home'          ),  # xterm
        (0x5a,              1,      'shift+tab'     ),  # xterm
        (0x5b, 0x41,        1,      'f1'            ),  # linux
        (0x5b, 0x42,        1,      'f2'            ),  # linux
        (0x5b, 0x43,        1,      'f3'            ),  # linux
        (0x5b, 0x44,        1,      'f4'            ),  # linux
        (0x5b, 0x45,        1,      'f5'            ),  # linux
    )

    color_map = {
        'default':      (-1,                    0),
        'black':        (curses.COLOR_BLACK,    0),
        'blue':         (curses.COLOR_BLUE,     0),
        'green':        (curses.COLOR_GREEN,    0),
        'cyan':         (curses.COLOR_CYAN,     0),
        'red':          (curses.COLOR_RED,      0),
        'magenta':      (curses.COLOR_MAGENTA,  0),
        'brown':        (curses.COLOR_YELLOW,   0),
        'lightgray':    (curses.COLOR_WHITE,    0),
        'gray':         (curses.COLOR_BLACK,    curses.A_BOLD),
        'lightblue':    (curses.COLOR_BLUE,     curses.A_BOLD),
        'lightgreen':   (curses.COLOR_GREEN,    curses.A_BOLD),
        'lightcyan':    (curses.COLOR_CYAN,     curses.A_BOLD),
        'lightred':     (curses.COLOR_RED,      curses.A_BOLD),
        'lightmagenta': (curses.COLOR_MAGENTA,  curses.A_BOLD),
        'yellow':       (curses.COLOR_YELLOW,   curses.A_BOLD),
        'white':        (curses.COLOR_WHITE,    curses.A_BOLD),
    }

    attr_map = {
        'bold':         curses.A_BOLD,
        'underline':    curses.A_UNDERLINE,
        'standout':     curses.A_STANDOUT,  # inverse bg/fg
        'blink':        curses.A_BLINK,
    }

    def __init__(self):
        Driver.__init__(self)
        self._log = logging.getLogger(__name__)
        self.stdscr = None
        self.cursor = None
        self.colors = {}     # maps names to curses attributes
        self.colorpairs = {} # maps tuple (fg,bg) to curses color_pair
        self.colorstack = [] # pushcolor/popcolor puts or gets attributes from this
        self.inputqueue = []
        self.mbtnstack = []
        self._mouse_last_pos = None  # Point
        self._mouse_last_bstate = None

    ## initialization, finalization ##

    def init(self):
        """Initialize curses"""
        self.stdscr = curses.initscr()
        curses.start_color()
        curses.use_default_colors()
        curses.noecho()
        curses.cbreak()
        self.stdscr.keypad(0)
        self.stdscr.immedok(0)

        self.size.h, self.size.w = self.stdscr.getmaxyx()

        curses.curs_set(False)  # hide cursor
        curses.mousemask(curses.ALL_MOUSE_EVENTS | curses.REPORT_MOUSE_POSITION)
        curses.mouseinterval(0)  # do not wait to detect clicks, we use only press/release

    def close(self):
        self.stdscr.keypad(0)
        curses.echo()
        curses.nocbreak()
        curses.endwin()

    ## drawing ##

    def clear(self):
        self.stdscr.erase()

    def putch(self, ch, x, y):
        try:
            if isinstance(ch, int):
                self.stdscr.addch(y, x, ch)
            elif isinstance(ch, str) and len(ch) == 1:
                self.stdscr.addstr(y, x, ch)
            else:
                raise TypeError('Integer or one-char string is required.')
        except curses.error as e:
            self._log.exception('putch(%r, %s, %s) error:' % (ch, x, y))

    def draw(self, buffer, x=0, y=0):
        for bufy in range(buffer.size.h):
            for bufx in range(buffer.size.w):
                char, attr_desc = buffer.get(bufx, bufy)
                self.setattr(attr_desc)
                self.putch(char, x + bufx, y + bufy)

    def flush(self):
        if self.cursor:
            self.stdscr.move(self.cursor.y, self.cursor.x)
            curses.curs_set(True)
        else:
            curses.curs_set(False)
        self.stdscr.refresh()

    ## colors, attributes ##

    def setattr(self, attr_desc):
        """Set attribute to be used for subsequent draw operations."""
        attr = self.colors.get(attr_desc, None)
        if attr is None:
            # first encountered `attr_desc`, initialize
            fg, bg, attrs = self._parse_attr_desc(attr_desc)
            fgcol, fgattr = self.color_map[fg]
            bgcol, _bgattr = self.color_map[bg]
            colpair = self._getcolorpair(fgcol, bgcol)
            attr = curses.color_pair(colpair) | self._parseattrs(attrs) | fgattr
            self.colors[attr_desc] = attr
        self.stdscr.attrset(attr)

    def _getcolorpair(self, fgcol, bgcol):
        pair = (fgcol, bgcol)
        if pair in self.colorpairs:
            return self.colorpairs[pair]
        num = len(self.colorpairs) + 1
        curses.init_pair(num, fgcol, bgcol)
        self.colorpairs[pair] = num
        return num

    def _parseattrs(self, attrs):
        res = 0
        for a in attrs:
            res = res | self.attr_map[a]
        return res

    ## input, events ##

    def getevents(self, timeout=None):
        """Process input, return list of events.

        timeout -- float, in seconds (None=infinite)

        Returns:
            [('event', param1, ...), ...]

        """
        # empty queue -> fill
        if len(self.inputqueue) == 0:
            if timeout is not None:
                timeout = math.ceil(timeout * 10)
            self._inputqueue_fill(timeout)

        res = []
        while len(self.inputqueue):
            c = self._inputqueue_get()

            if c == curses.KEY_MOUSE:
                res += self._process_mouse()

            elif c == curses.KEY_RESIZE:
                self.size.h, self.size.w = self.stdscr.getmaxyx()
                res.append(('resize', self.size.w, self.size.h))

            elif curses.ascii.isctrl(c):
                self._inputqueue_unget(c)
                res += self._process_control_chars()

            elif 192 <= c <= 255:
                self._inputqueue_unget(c)
                res += self._process_utf8_chars()

            elif curses.ascii.isprint(c):
                res += [('keypress', None, str(chr(c)), set())]

            else:
                self._inputqueue_unget(c)
                res += self._process_control_chars()

        return res

    def _inputqueue_fill(self, timeout=None):
        """Wait for curses input, add it to inputqueue.

        timeout -- int, tenths of second (None=infinite)

        """
        if timeout is None:
            # wait indefinitely
            c = self.stdscr.getch()
            self.inputqueue.insert(0, c)

        elif timeout > 0:
            # wait
            curses.halfdelay(timeout)
            c = self.stdscr.getch()
            curses.cbreak()
            if c == -1:
                return
            self.inputqueue.insert(0, c)

        # timeout = 0 -> no wait

        self.stdscr.nodelay(1)

        while True:
            c = self.stdscr.getch()
            if c == -1:
                break
            self.inputqueue.insert(0, c)

        self.stdscr.nodelay(0)

    def _inputqueue_get(self):
        c = None
        try:
            c = self.inputqueue.pop()
        except IndexError:
            pass
        return c

    def _inputqueue_get_wait(self):
        c = None
        while c is None:
            try:
                c = self.inputqueue.pop()
            except IndexError:
                curses.napms(25)
                self._inputqueue_fill(0)
        return c

    def _inputqueue_unget(self, c):
        self.inputqueue.append(c)

    def _process_mouse(self):
        try:
            _id, x, y, _z, bstate = curses.getmouse()
        except curses.error:
            return []

        pos = Point(x, y)
        out = []

        if bstate & curses.REPORT_MOUSE_POSITION:
            if self._mouse_last_pos != pos:
                if self._mouse_last_pos:
                    relpos = pos - self._mouse_last_pos
                    out += [('mousemove', 0, pos, relpos)]
                self._mouse_last_pos = pos

        # we are interested only in changes, not buttons already pressed before event
        if self._mouse_last_bstate is not None:
            old = self._mouse_last_bstate
            new = bstate
            bstate = ~old & new
            self._mouse_last_bstate = new
        else:
            self._mouse_last_bstate = bstate

        if bstate & curses.BUTTON1_PRESSED:
            out += [('mousedown', 1, pos)]
        if bstate & curses.BUTTON2_PRESSED:
            out += [('mousedown', 2, pos)]
        if bstate & curses.BUTTON3_PRESSED:
            out += [('mousedown', 3, pos)]
        if bstate & curses.BUTTON1_RELEASED:
            out += [('mouseup', 1, pos)]
        if bstate & curses.BUTTON2_RELEASED:
            out += [('mouseup', 2, pos)]
        if bstate & curses.BUTTON3_RELEASED:
            out += [('mouseup', 3, pos)]

        # reset last pos when pressed/released
        if len(out) > 0 and out[-1][0] in ('mousedown', 'mouseup'):
            self._mouse_last_pos = None

        return out

    def _process_utf8_chars(self):
        #FIXME read exact number of chars as defined by utf-8
        utf = []
        while len(utf) <= 6:
            c = self._inputqueue_get_wait()
            utf.append(c)
            try:
                uni = str(bytes(utf), 'utf-8')
                return [('keypress', None, uni, set())]
            except UnicodeDecodeError:
                continue
        raise Exception('Invalid UTF-8 sequence: %r' % utf)

    def _process_control_chars(self):
        codes = self.key_codes
        matchingcodes = []
        match = None
        consumed = []

        # consume next char, filter out matching codes
        c = self._inputqueue_get_wait()
        consumed.append(c)

        while True:
            #self.log.debug('c=%s len=%s', c, len(codes))
            for code in codes:
                if c == code[len(consumed)-1]:
                    if len(code) - 1 == len(consumed):
                        match = code
                    else:
                        matchingcodes += [code]

            #self.log.debug('matching=%s', len(matchingcodes))

            # match found, or no matching code found -> stop
            if len(matchingcodes) == 0:
                break

            # match found and some sequencies still match -> continue
            if len(matchingcodes) > 0:
                if len(self.inputqueue) == 0:
                    self._inputqueue_fill(1)

            c = self._inputqueue_get()
            if c:
                consumed.append(c)
                codes = matchingcodes
                matchingcodes = []
            else:
                break

        keyname = None
        if match:
            # compare match to consumed, return unused chars
            l = len(match) - 1
            while len(consumed) > l:
                self._inputqueue_unget(consumed[-1])
                del consumed[-1]
            keyname = match[-1]

        if match is None:
            self._log.debug('Unknown control sequence: %s',
                           ','.join(['0x%x' % x for x in consumed]))
            return [('keypress', 'Unknown', None, set())]

        if keyname == 'mouse':
            return self._process_xterm_mouse()

        if keyname == 'CSI':
            return self._process_control_sequence()

        return [('keypress', keyname, None, set())]

    def _process_xterm_mouse(self):
        t = self._inputqueue_get_wait()
        x = self._inputqueue_get_wait() - 0x21
        y = self._inputqueue_get_wait() - 0x21
        pos = Point(x, y)

        out = []

        if t in (0x20, 0x21, 0x22):
            # button press
            btn = t - 0x1f
            if not btn in self.mbtnstack:
                self.mbtnstack.append(btn)
                self._mouse_last_pos = None
                out += [('mousedown', btn, pos)]
            else:
                # mouse move
                if self._mouse_last_pos != pos:
                    if self._mouse_last_pos:
                        relpos = pos - self._mouse_last_pos
                        out += [('mousemove', btn, pos, relpos)]
                    self._mouse_last_pos = pos
        elif t == 0x23:
            # button release
            btn = self.mbtnstack.pop()
            self._mouse_last_pos = (None, None)
            out += [('mouseup', btn, pos)]
        elif t in (0x60, 0x61):
            # wheel up, down
            btn = 4 + t - 0x60
            out += [('mousewheel', btn, pos)]
        else:
            raise Exception('Unknown mouse event: %x' % t)

        return out

    def _process_control_sequence(self):
        codes = self.csi_codes
        debug_seq = [0x1b, 0x5b]
        c = self._inputqueue_get_wait()
        debug_seq.append(c)

        # numeric parameters?
        params = []
        if chr(c).isdigit():
            params.append(chr(c))
            while True:
                c = self._inputqueue_get_wait()
                debug_seq.append(c)
                if chr(c).isdigit():
                    params[-1] += chr(c)
                elif chr(c) == ';':
                    params.append('')
                else:
                    break
        params = [int(x) for x in params]
        if len(params) == 0:
            params = [1]

        # filter codes using byte sequence
        while True:
            matching_codes = []
            for code in codes:
                if len(code) > 2 and code[0] == c:
                    matching_codes.append(code[1:])
            codes = matching_codes

            if len(codes) == 0:
                # no match -> unknown code
                seq = ','.join(['0x%x' % x for x in debug_seq])
                self._log.debug('Unknown control sequence: %s', seq)
                return [('keypress', 'Unknown:' + seq, None, set())]
            elif len(codes) == 1:
                # one match -> we got the winner
                break
            elif len(codes[0]) == 2:
                # more than one matching, but no more chars to check
                # will be sorted out using parameters
                break
            else:
                # more than one matching -> continue loop
                c = self._inputqueue_get_wait()
                debug_seq.append(c)

        # filter codes using first parameter
        matching_codes = []
        for code in codes:
            if params[0] == code[0] or params[0] is None:
                matching_codes.append(code)

        if len(matching_codes) == 0:
            # no match -> unknown code
            seq = ','.join(['0x%x' % x for x in debug_seq])
            self._log.debug('Unknown control sequence: %s', seq)
            return [('keypress', 'Unknown:' + seq, None, set())]

        if len(matching_codes) > 1:
            raise Exception('Internal error: invalid csi_codes, more than one matching')

        keyname = matching_codes[0][1]

        # modifiers
        mod_bits = 0
        if len(params) > 1:
            mod_bits = params[1] - 1

        # convert modifiers from bit-map to set
        mod_set = set()
        for bit, name in enumerate(('shift', 'alt', 'ctrl', 'meta')):
            if mod_bits & 1<<bit:
                mod_set.add(name)

        # parse keynames in form "shift+tab"
        if '+' in keyname:
            parts = keyname.split('+')
            for mod in parts[:-1]:
                assert(mod in ('shift', 'alt', 'ctrl', 'meta'))
                mod_set.add(mod)
            keyname = parts[-1]

        return [('keypress', keyname, None, mod_set)]


driver_class = CursesDriver