pycolib/soap.py
author Radek Brich <radek.brich@devl.cz>
Wed, 03 Apr 2013 20:37:05 +0200
changeset 1 ee31f1bf17c1
parent 0 soap.py@ee24ce33ab55
permissions -rw-r--r--
Add ansicolor, prettysize, makeurl and README.

# minisoap - minimal SOAP client
#
# Copyright (c) 2011, 2013  Radek Brich <radek.brich@devl.cz>
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.


from http.client import HTTPConnection
from urllib.parse import urlparse
from lxml import etree
from collections import OrderedDict

debug = 0
HTTPConnection.debuglevel = debug


class SoapError(Exception):
    pass


class SoapResponse(OrderedDict):
    def __init__(self, name):
        OrderedDict.__init__(self)
        self.name = name

    def _printout(self, out, res, level):
        indent = level * 2 * ' '
        for k in res.keys():
            if type(res[k]) is list:
                for row in res[k]:
                    out.extend([indent, k, '\n'])
                    self._printout(out, row, level+1)
            else:
                out.extend([indent, k, ': ', str(res[k]), '\n'])

    def __str__(self):
        out = [self.name, '\n']
        self._printout(out, self, 1)
        return ''.join(out).rstrip()


class SoapClient:
    def __init__(self, url, namespace):
        self.url = url
        self.parsedurl = urlparse(url)
        self.namespace = namespace
        self.conn = HTTPConnection(self.parsedurl.netloc)

    def _build_soap_message(self, name, args):
        SOAP = 'http://schemas.xmlsoap.org/soap/envelope/'
        root = etree.Element('{%s}Envelope' % SOAP, nsmap={'soap':SOAP})
        body = etree.SubElement(root, '{%s}Body' % SOAP)
        func = etree.SubElement(body, '{%s}%s' % (self.namespace, name), nsmap={None:self.namespace})
        for key in args.keys():
            param = etree.SubElement(func, key)
            param.text = str(args[key])
        if debug:
            print(etree.tostring(root, pretty_print=True).decode('utf8'))
        return root

    def _break_params(self, func, d):
        for param in func.iterchildren():
            if len(param) == 0:
                d[param.tag] = param.text
            else:
                if not param.tag in d:
                    d[param.tag] = []
                newd = OrderedDict()
                self._break_params(param, newd)
                d[param.tag].append(newd)

    def _break_soap_message(self, root):
        if not root.tag.endswith('}Envelope'):
            raise SoapError('Reply has no Envelope.')

        SOAP = root.tag.split('}')[0][1:]

        body_list = list(root.iterchildren('{%s}Body' % SOAP))
        if len(body_list) != 1:
            raise SoapError('Bad SOAP format (%d Body elements)' % len(body_list))
        body = body_list[0]

        func_list =list(body.iterchildren())
        if len(func_list) != 1:
            raise SoapError('Bad SOAP format (Body has %d children, should have one)' % len(body_list))
        func = func_list[0]

        if debug:
            print(etree.tostring(func, pretty_print=True).decode('utf8'))
        res = SoapResponse(func.tag)
        self._break_params(func, res)

        return res

    def _serialize_xml(self, root):
        return etree.tostring(root,
            pretty_print=True,
            encoding='utf-8',
            xml_declaration=True)

    def _parse_xml(self, file):
        tree = etree.parse(file)
        return tree.getroot()

    def _send_soap(self, body):
        headers = {
            'Content-type': "text/xml;charset='utf-8'",
            'Soapaction': "''",
            'Accept': 'text/xml'}
        self.conn.request('POST', self.parsedurl.path, body, headers)
        resp = self.conn.getresponse()
        if resp.status != 200:
            raise SoapError('Server replied %d %s' % (resp.status, resp.reason))
        return resp

    def __getattr__(self, name):
        def fnc(**kwargs):
            root = self._build_soap_message(name, kwargs)
            body = self._serialize_xml(root)
            resp = self._send_soap(body)
            root = self._parse_xml(resp)
            return self._break_soap_message(root)

        return fnc


if __name__ == '__main__':
    service = SoapClient('http://10.0.0.1:34567/', namespace='http://devl.cz/')
    res = service.my_test_request()
    print(res)