Source code for M2Crypto.m2xmlrpclib

"""M2Crypto enhancement to xmlrpclib.

Copyright (c) 1999-2003 Ng Pheng Siong. All rights reserved."""

import base64
import re

from M2Crypto import __version__ as __M2Version

from M2Crypto import SSL, httpslib, m2urllib2  # type: ignore
from typing import Callable, Optional, Union, Any, Dict, Tuple  # noqa

from xmlrpc.client import ProtocolError, Transport
from xmlrpc.client import *  # noqa


__version__ = __M2Version

HostType = Union[str, Tuple[str, Dict[str, str]]]
Marshallable = Any
SizedBufferType = Union[bytes, bytearray]


[docs] class SSL_Transport(Transport): user_agent = "M2Crypto_XMLRPC/%s - %s" % ( __version__, Transport.user_agent, ) def __init__(self, ssl_context: Optional[SSL.Context] = None, *args, **kw) -> None: Transport.__init__(self, *args, **kw) if ssl_context is None: self.ssl_ctx = SSL.Context() else: # FIX: Add assertion to make test_init_junk_ssl_context pass assert isinstance(ssl_context, SSL.Context), ssl_context self.ssl_ctx = ssl_context
[docs] @staticmethod def splituser(host): """splituser('user[:passwd]@host[:port]') --> 'user[:passwd]', 'host[:port]'.""" match = re.match("^(.*)@(.*)$", host) if match: return match.group(1, 2) return None, host
[docs] @staticmethod def splitport(host): """splitport('host:port') --> 'host', 'port'.""" match = re.match("^(.*):([0-9]*)$", host) if match: host, port = match.groups() if port: return host, port return host, None
# This ignore is necessary because SizedBuffer is a private Protocol we cannot import # and satisfy fully at the argument level.
[docs] def request( self, host: HostType, handler: str, request_body: SizedBufferType, # type: ignore[override] verbose: int = 0, ) -> Tuple[Marshallable, ...]: # Handle username and password. user_passwd, host_port = self.splituser(host) _host, _port = self.splitport(host_port) _host_str = _host.decode("latin-1") if isinstance(_host, bytes) else _host h = httpslib.HTTPSConnection(_host_str, int(_port), ssl_context=self.ssl_ctx) # type: ignore[call-arg] try: if verbose: h.set_debuglevel(1) h.putrequest("POST", handler) # required by HTTP/1.1 h.putheader("Host", _host_str) # required by XML-RPC h.putheader("User-Agent", self.user_agent) h.putheader("Content-Type", "text/xml") h.putheader("Content-Length", str(len(request_body))) # Authorisation. if user_passwd is not None: auth = base64.encodebytes(user_passwd).strip() h.putheader("Authorization", "Basic %s" % auth.decode("ascii")) h.endheaders() if request_body: h.send(request_body) response = h.getresponse() host_str = host if isinstance(host, str) else str(host) headers_dict = dict(response.getheaders()) if response.status != 200: raise ProtocolError( host_str + handler, response.status, response.reason, headers_dict, ) self.verbose = verbose return self.parse_response(response) finally: h.close()