tuikit/driver_curses.py
author Radek Brich <radek.brich@devl.cz>
Sun, 03 Feb 2013 16:38:41 +0100
changeset 77 fc1989059e19
parent 71 cfd3445107b4
child 82 2bead23b1262
permissions -rw-r--r--
Propagate "quit" event, do not just terminate application. Resize: flag widgets to be resized, do resizes only once before draw. Draw: flag widgets to be redrawn, do not draw everything on any event.

# -*- coding: utf-8 -*-

import curses.wrapper
import curses.ascii
import math
import logging

from tuikit.driver import Driver


class DriverCurses(Driver):
    key_codes = (
        (0x09,                      'tab'           ),
        (0x0a,                      'enter'         ),
        (0x7f,                      'backspace'     ),
        (0x1b,                      'escape'        ),
        (0x1b,0x4f,0x50,            'f1'            ),  # xterm
        (0x1b,0x4f,0x51,            'f2'            ),  # xterm
        (0x1b,0x4f,0x52,            'f3'            ),  # xterm
        (0x1b,0x4f,0x53,            'f4'            ),  # xterm
        (0x1b,0x5b,                 'CSI'           ),  # see csi_codes
        (0x1b,0x5b,0x4d,            'mouse'         ),
    )

    # http://en.wikipedia.org/wiki/ANSI_escape_code
    csi_codes = (
        # code            param     key name
        (0x7e,              1,      'home'          ),  # linux
        (0x7e,              2,      'insert'        ),
        (0x7e,              3,      'delete'        ),
        (0x7e,              4,      'end'           ),  # linux
        (0x7e,              5,      'pageup'        ),
        (0x7e,              6,      'pagedown'      ),
        (0x7e,              15,     'f5'            ),
        (0x7e,              17,     'f6'            ),
        (0x7e,              18,     'f7'            ),
        (0x7e,              19,     'f8'            ),
        (0x7e,              20,     'f9'            ),
        (0x7e,              21,     'f10'           ),
        (0x7e,              23,     'f11'           ),
        (0x7e,              24,     'f12'           ),
        (0x41,              1,      'up'            ),
        (0x42,              1,      'down'          ),
        (0x43,              1,      'right'         ),
        (0x44,              1,      'left'          ),
        (0x46,              1,      'end'           ),  # xterm
        (0x48,              1,      'home'          ),  # xterm
        (0x5b,0x41,         1,      'f1'            ),  # linux
        (0x5b,0x42,         1,      'f2'            ),  # linux
        (0x5b,0x43,         1,      'f3'            ),  # linux
        (0x5b,0x44,         1,      'f4'            ),  # linux
        (0x5b,0x45,         1,      'f5'            ),  # linux
    )

    color_map = {
        'black'     : (curses.COLOR_BLACK,  0),
        'blue'      : (curses.COLOR_BLUE,   0),
        'green'     : (curses.COLOR_GREEN,  0),
        'cyan'      : (curses.COLOR_CYAN,   0),
        'red'       : (curses.COLOR_RED,    0),
        'magenta'   : (curses.COLOR_MAGENTA,0),
        'brown'     : (curses.COLOR_YELLOW, 0),
        'lightgray' : (curses.COLOR_WHITE,  0),
        'gray'          : (curses.COLOR_BLACK,  curses.A_BOLD),
        'lightblue'     : (curses.COLOR_BLUE,   curses.A_BOLD),
        'lightgreen'    : (curses.COLOR_GREEN,  curses.A_BOLD),
        'lightcyan'     : (curses.COLOR_CYAN,   curses.A_BOLD),
        'lightred'      : (curses.COLOR_RED,    curses.A_BOLD),
        'lightmagenta'  : (curses.COLOR_MAGENTA,curses.A_BOLD),
        'yellow'        : (curses.COLOR_YELLOW, curses.A_BOLD),
        'white'         : (curses.COLOR_WHITE,  curses.A_BOLD),
    }

    attr_map = {
        'bold'      : curses.A_BOLD,
        'underline' : curses.A_UNDERLINE,
        'standout'  : curses.A_STANDOUT,  # inverse bg/fg
        'blink'     : curses.A_BLINK,
    }

    def __init__(self):
        '''Set driver attributes to default values.'''
        Driver.__init__(self)
        self.log = logging.getLogger('tuikit')
        self.screen = None
        self.cursor = None
        self.colors = {}     # maps names to curses attributes
        self.colorpairs = {} # maps tuple (fg,bg) to curses color_pair
        self.colorstack = [] # pushcolor/popcolor puts or gets attributes from this
        self.inputqueue = []
        self.mbtnstack = []
        self._mouse_last_pos = (None, None)
        self._mouse_last_bstate = None

    def init_curses(self):
        '''Initialize curses'''
        self.size.h, self.size.w = self.screen.getmaxyx()
        self.screen.immedok(0)
        self.screen.keypad(0)
        curses.curs_set(False)  # hide cursor
        curses.mousemask(curses.ALL_MOUSE_EVENTS | curses.REPORT_MOUSE_POSITION)
        curses.mouseinterval(0)  # do not wait to detect clicks, we use only press/release

    def start(self, mainfunc):
        def main(screen):
            self.screen = screen
            self.init_curses()
            mainfunc()
        curses.wrapper(main)


    ## colors, attributes ##

    def _parsecolor(self, name):
        name = name.lower().strip()
        return self.color_map[name]

    def _getcolorpair(self, fg, bg):
        pair = (fg, bg)
        if pair in self.colorpairs:
            return self.colorpairs[pair]
        num = len(self.colorpairs) + 1
        curses.init_pair(num, fg, bg)
        self.colorpairs[pair] = num
        return num

    def _parseattrs(self, attrs):
        res = 0
        for a in attrs:
            a = a.lower().strip()
            res = res | self.attr_map[a]
        return res

    def defcolor(self, name, desc):
        """Define color name."""
        parts = desc.split(',')
        fgbg = parts[0].split(' on ', 1)
        fg = fgbg[0]
        bg = fgbg[1:] and fgbg[1] or 'black'
        attrs = parts[1:]
        fg, fgattr = self._parsecolor(fg)
        bg, _bgattr = self._parsecolor(bg)
        col = self._getcolorpair(fg, bg)
        attr = self._parseattrs(attrs)
        self.colors[name] = curses.color_pair(col) | fgattr | attr

    def setcolor(self, name):
        """Set defined color. Previous color is forgotten."""
        self.screen.attrset(self.colors[name])

    def pushcolor(self, name):
        # add prefix if such color is available
        if len(self.colorprefix):
            prefixname = self.colorprefix[-1] + name
            if prefixname in self.colors:
                name = prefixname
        attr = self.colors[name]
        self.screen.attrset(attr)
        self.colorstack.append(attr)

    def popcolor(self):
        self.colorstack.pop()
        if len(self.colorstack):
            attr = self.colorstack[-1]
        else:
            attr = 0
        self.screen.attrset(attr)


    ## drawing ##

    def putch(self, x, y, c):
        if not self.clipstack.test(x, y):
            return
        try:
            if isinstance(c, str) and len(c) == 1:
                self.screen.addstr(y, x, c)
            else:
                self.screen.addch(y, x, c)
        except curses.error:
            pass

    def erase(self):
        self.screen.erase()

    def commit(self):
        if self.cursor:
            self.screen.move(*self.cursor)
            curses.curs_set(True)
        else:
            curses.curs_set(False)
        self.screen.refresh()


    ## cursor ##

    def showcursor(self, x, y):
        if self.clipstack.test(x, y):
            self.cursor = (y, x)
        else:
            self.cursor = None

    def hidecursor(self):
        curses.curs_set(False)
        self.cursor = None


    ## input ##

    def inputqueue_fill(self, timeout=None):
        """Wait for curses input, add it to inputqueue.

        timeout -- int, tenths of second (None=infinite)

        """
        if timeout is None:
            # wait indefinitely
            c = self.screen.getch()
            self.inputqueue.insert(0, c)

        elif timeout > 0:
            # wait
            curses.halfdelay(timeout)
            c = self.screen.getch()
            curses.cbreak()
            if c == -1:
                return
            self.inputqueue.insert(0, c)

        # timeout = 0 -> no wait

        self.screen.nodelay(1)

        while True:
            c = self.screen.getch()
            if c == -1:
                break
            self.inputqueue.insert(0, c)

        self.screen.nodelay(0)


    def inputqueue_top(self, num=0):
        return self.inputqueue[-1-num]


    def inputqueue_get(self):
        c = None
        try:
            c = self.inputqueue.pop()
        except IndexError:
            pass
        return c


    def inputqueue_get_wait(self):
        c = None
        while c is None:
            try:
                c = self.inputqueue.pop()
            except IndexError:
                curses.napms(25)
                self.inputqueue_fill(0)
        return c


    def inputqueue_unget(self, c):
        self.inputqueue.append(c)


    def getevents(self, timeout=None):
        '''Process input, return list of events.

        timeout -- float, in seconds

        '''
        # empty queue -> fill
        if len(self.inputqueue) == 0:
            if timeout is not None:
                timeout = math.ceil(timeout * 10)
            self.inputqueue_fill(timeout)

        res = []
        while len(self.inputqueue):
            c = self.inputqueue_get()

            if c == curses.KEY_MOUSE:
                res += self.process_mouse()

            elif c == curses.KEY_RESIZE:
                self.size.h, self.size.w = self.screen.getmaxyx()
                res.append(('resize',))

            elif curses.ascii.isctrl(c):
                self.inputqueue_unget(c)
                res += self.process_control_chars()

            elif c >= 192 and c <= 255:
                self.inputqueue_unget(c)
                res += self.process_utf8_chars()

            elif curses.ascii.isprint(c):
                res += [('keypress', None, str(chr(c)))]

            else:
                self.inputqueue_unget(c)
                res += self.process_control_chars()

        return res


    def process_mouse(self):
        try:
            _id, x, y, _z, bstate = curses.getmouse()
        except curses.error:
            return []

        out = []

        if bstate & curses.REPORT_MOUSE_POSITION:
            if self._mouse_last_pos != (x, y):
                if self._mouse_last_pos[0] is not None:
                    relx = x - (self._mouse_last_pos[0] or 0)
                    rely = y - (self._mouse_last_pos[1] or 0)
                    out += [('mousemove', 0, x, y, relx, rely)]
                self._mouse_last_pos = (x, y)

        # we are interested only in changes, not buttons already pressed before event
        if self._mouse_last_bstate is not None:
            old = self._mouse_last_bstate
            new = bstate
            bstate = ~old & new
            self._mouse_last_bstate = new
        else:
            self._mouse_last_bstate = bstate

        if bstate & curses.BUTTON1_PRESSED:
            out += [('mousedown', 1, x, y)]
        if bstate & curses.BUTTON2_PRESSED:
            out += [('mousedown', 2, x, y)]
        if bstate & curses.BUTTON3_PRESSED:
            out += [('mousedown', 3, x, y)]
        if bstate & curses.BUTTON1_RELEASED:
            out += [('mouseup', 1, x, y)]
        if bstate & curses.BUTTON2_RELEASED:
            out += [('mouseup', 2, x, y)]
        if bstate & curses.BUTTON3_RELEASED:
            out += [('mouseup', 3, x, y)]

        # reset last pos when pressed/released
        if len(out) > 0 and out[-1][0] in ('mousedown', 'mouseup'):
            self._mouse_last_pos = (None, None)

        return out


    def process_utf8_chars(self):
        #FIXME read exact number of chars as defined by utf-8
        utf = []
        while len(utf) <= 6:
            c = self.inputqueue_get_wait()
            utf.append(c)
            try:
                uni = str(bytes(utf), 'utf-8')
                return [('keypress', None, uni)]
            except UnicodeDecodeError:
                continue
        raise Exception('Invalid UTF-8 sequence: %r' % utf)


    def process_control_chars(self):
        codes = self.key_codes
        matchingcodes = []
        match = None
        consumed = []

        # consume next char, filter out matching codes
        c = self.inputqueue_get_wait()
        consumed.append(c)

        while True:
            #self.log.debug('c=%s len=%s', c, len(codes))
            for code in codes:
                if c == code[len(consumed)-1]:
                    if len(code) - 1 == len(consumed):
                        match = code
                    else:
                        matchingcodes += [code]

            #self.log.debug('matching=%s', len(matchingcodes))

            # match found, or no matching code found -> stop
            if len(matchingcodes) == 0:
                break

            # match found and some sequencies still match -> continue
            if len(matchingcodes) > 0:
                if len(self.inputqueue) == 0:
                    self.inputqueue_fill(1)

            c = self.inputqueue_get()
            if c:
                consumed.append(c)
                codes = matchingcodes
                matchingcodes = []
            else:
                break

        keyname = None
        if match:
            # compare match to consumed, return unused chars
            l = len(match) - 1
            while len(consumed) > l:
                self.inputqueue_unget(consumed[-1])
                del consumed[-1]
            keyname = match[-1]

        if match is None:
            self.log.debug('Unknown control sequence: %s',
                ','.join(['0x%x'%x for x in consumed]))
            return [('keypress', 'Unknown', None)]

        if keyname == 'mouse':
            return self.process_xterm_mouse()

        if keyname == 'CSI':
            return self.process_control_sequence()

        return [('keypress', keyname, None)]


    def process_xterm_mouse(self):
        t = self.inputqueue_get_wait()
        x = self.inputqueue_get_wait() - 0x21
        y = self.inputqueue_get_wait() - 0x21

        out = []

        if t in (0x20, 0x21, 0x22):
            # button press
            btn = t - 0x1f
            if not btn in self.mbtnstack:
                self.mbtnstack.append(btn)
                self._mouse_last_pos = (None, None)
                out += [('mousedown', btn, x, y)]
            else:
                # mouse move
                if self._mouse_last_pos != (x, y):
                    if self._mouse_last_pos[0] is not None:
                        relx = x - self._mouse_last_pos[0]
                        rely = y - self._mouse_last_pos[1]
                        out += [('mousemove', btn, x, y, relx, rely)]
                    self._mouse_last_pos = (x, y)
        elif t == 0x23:
            # button release
            btn = self.mbtnstack.pop()
            self._mouse_last_pos = (None, None)
            out += [('mouseup', btn, x, y)]
        elif t in (0x60, 0x61):
            # wheel up, down
            btn = 4 + t - 0x60
            out += [('mousewheel', btn, x, y)]
        else:
            raise Exception('Unknown mouse event: %x' % t)

        return out

    def process_control_sequence(self):
        codes = self.csi_codes
        debug_seq = [0x1b, 0x5b]
        c = self.inputqueue_get_wait()
        debug_seq.append(c)

        # numeric parameters?
        params = []
        if chr(c).isdigit():
            params.append(chr(c))
            while True:
                c = self.inputqueue_get_wait()
                debug_seq.append(c)
                if chr(c).isdigit():
                    params[-1] += chr(c)
                elif chr(c) == ';':
                    params.append('')
                else:
                    break
        params = [int(x) for x in params]
        if len(params) == 0:
            params = [1]

        # filter codes using byte sequence
        while True:
            matching_codes = []
            for code in codes:
                if len(code) > 2 and code[0] == c:
                    matching_codes.append(code[1:])
            codes = matching_codes

            if len(codes) == 0:
                # no match -> unknown code
                seq = ','.join(['0x%x' % x for x in debug_seq])
                self.log.debug('Unknown control sequence: %s', seq)
                return [('keypress', 'Unknown:' + seq, None)]
            elif len(codes) == 1:
                # one match -> we got the winner
                break
            elif len(codes[0]) == 2:
                # more than one matching, but no more chars to check
                # will be sorted out using parameters
                break
            else:
                # more than one matching -> continue loop
                c = self.inputqueue_get_wait()
                debug_seq.append(c)

        # filter codes using first parameter
        matching_codes = []
        for code in codes:
            if params[0] == code[0] or params[0] is None:
                matching_codes.append(code)

        if len(matching_codes) == 0:
            # no match -> unknown code
            seq = ','.join(['0x%x' % x for x in debug_seq])
            self.log.debug('Unknown control sequence: %s', seq)
            return [('keypress', 'Unknown:' + seq, None)]

        if len(matching_codes) > 1:
            raise Exception('Internal error: invalid csi_codes, more than one matching')

        keyname = matching_codes[0][1]

        # modifiers
        mod = 0
        if len(params) > 1:
            mod = params[1] - 1

        return [('keypress', keyname, None, mod)]


driverclass = DriverCurses