| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122 |
- import usocket
- class Response:
- def __init__(self, f):
- self.raw = f
- self.encoding = "utf-8"
- self._cached = None
- def close(self):
- if self.raw:
- self.raw.close()
- self.raw = None
- self._cached = None
- @property
- def content(self):
- if self._cached is None:
- self._cached = self.raw.read()
- self.raw.close()
- self.raw = None
- return self._cached
- @property
- def text(self):
- return str(self.content, self.encoding)
- def json(self):
- import ujson
- return ujson.loads(self.content)
- def request(method, url, data=None, json=None, headers={}, stream=None,params=None):
- try:
- proto, dummy, host, path = url.split("/", 3)
- except ValueError:
- proto, dummy, host = url.split("/", 2)
- path = ""
- if proto == "http:":
- port = 80
- elif proto == "https:":
- import ussl
- port = 443
- else:
- raise ValueError("Unsupported protocol: " + proto)
- if ":" in host:
- host, port = host.split(":", 1)
- port = int(port)
-
- if params:
- path = path + "?"
- for k in params:
- path = path + '&'+k+'='+params[k]
- ai = usocket.getaddrinfo(host, port)
- addr = ai[0][4]
- s = usocket.socket()
- s.connect(addr)
- if proto == "https:":
- s = ussl.wrap_socket(s)
- s.write(b"%s /%s HTTP/1.0\r\n" % (method, path))
- if not "Host" in headers:
- s.write(b"Host: %s\r\n" % host)
- # Iterate over keys to avoid tuple alloc
- for k in headers:
- s.write(k)
- s.write(b": ")
- s.write(headers[k])
- s.write(b"\r\n")
- if json is not None:
- assert data is None
- import ujson
- data = ujson.dumps(json)
- if data:
- s.write(b"Content-Length: %d\r\n" % len(data))
- s.write(b"\r\n")
- if data:
- s.write(data)
- l = s.readline()
- protover, status, msg = l.split(None, 2)
- status = int(status)
- #print(protover, status, msg)
- while True:
- l = s.readline()
- if not l or l == b"\r\n":
- break
- #print(l)
- if l.startswith(b"Transfer-Encoding:"):
- if b"chunked" in l:
- raise ValueError("Unsupported " + l)
- elif l.startswith(b"Location:") and not 200 <= status <= 299:
- raise NotImplementedError("Redirects not yet supported")
- resp = Response(s)
- resp.status_code = status
- resp.reason = msg.rstrip()
- return resp
- def head(url, **kw):
- return request("HEAD", url, **kw)
- def get(url, **kw):
- return request("GET", url, **kw)
- def post(url, **kw):
- return request("POST", url, **kw)
- def put(url, **kw):
- return request("PUT", url, **kw)
- def patch(url, **kw):
- return request("PATCH", url, **kw)
- def delete(url, **kw):
- return request("DELETE", url, **kw)
|