9 from xml.etree import cElementTree as ET
10 EXPECTED_REQUESTS = []
12 class RequestWrongOrder(Exception):
13 """raised if an unexpected request is issued to urllib2"""
14 def __init__(self, url, exp_url, method, exp_method):
15 Exception.__init__(self)
17 self.exp_url = exp_url
19 self.exp_method = exp_method
22 return '%s, %s, %s, %s' % (self.url, self.exp_url, self.method, self.exp_method)
24 class RequestDataMismatch(Exception):
25 """raised if POSTed or PUTed data doesn't match with the expected data"""
26 def __init__(self, url, got, exp):
32 return '%s, %s, %s' % (self.url, self.got, self.exp)
34 class MyHTTPHandler(urllib2.HTTPHandler):
35 def __init__(self, exp_requests, fixtures_dir):
36 urllib2.HTTPHandler.__init__(self)
37 self.__exp_requests = exp_requests
38 self.__fixtures_dir = fixtures_dir
40 def http_open(self, req):
41 r = self.__exp_requests.pop(0)
42 if req.get_full_url() != r[1] or req.get_method() != r[0]:
43 raise RequestWrongOrder(req.get_full_url(), r[1], req.get_method(), r[0])
44 if req.get_method() in ('GET', 'DELETE'):
45 return self.__mock_GET(r[1], **r[2])
46 elif req.get_method() in ('PUT', 'POST'):
47 return self.__mock_PUT(req, **r[2])
49 def __mock_GET(self, fullurl, **kwargs):
50 return self.__get_response(fullurl, **kwargs)
52 def __mock_PUT(self, req, **kwargs):
53 exp = kwargs.get('exp', None)
54 if exp is not None and kwargs.has_key('expfile'):
55 raise RuntimeError('either specify exp or expfile')
56 elif kwargs.has_key('expfile'):
57 exp = open(os.path.join(self.__fixtures_dir, kwargs['expfile']), 'r').read()
59 if req.get_data() != exp:
60 raise RequestDataMismatch(req.get_full_url(), req.get_data(), exp)
61 return self.__get_response(req.get_full_url(), **kwargs)
63 def __get_response(self, url, **kwargs):
65 if kwargs.has_key('exception'):
66 raise kwargs['exception']
67 if not kwargs.has_key('text') and kwargs.has_key('file'):
68 f = StringIO.StringIO(open(os.path.join(self.__fixtures_dir, kwargs['file']), 'r').read())
69 elif kwargs.has_key('text') and not kwargs.has_key('file'):
70 f = StringIO.StringIO(kwargs['text'])
72 raise RuntimeError('either specify text or file')
73 resp = urllib2.addinfourl(f, '', url)
78 def urldecorator(method, fullurl, **kwargs):
79 def decorate(test_method):
80 def wrapped_test_method(*args):
81 addExpectedRequest(method, fullurl, **kwargs)
83 # "rename" method otherwise we cannot specify a TestCaseClass.testName
84 # cmdline arg when using unittest.main()
85 wrapped_test_method.__name__ = test_method.__name__
86 return wrapped_test_method
89 def GET(fullurl, **kwargs):
90 return urldecorator('GET', fullurl, **kwargs)
92 def PUT(fullurl, **kwargs):
93 return urldecorator('PUT', fullurl, **kwargs)
95 def POST(fullurl, **kwargs):
96 return urldecorator('POST', fullurl, **kwargs)
98 def DELETE(fullurl, **kwargs):
99 return urldecorator('DELETE', fullurl, **kwargs)
101 def addExpectedRequest(method, url, **kwargs):
102 global EXPECTED_REQUESTS
103 EXPECTED_REQUESTS.append((method, url, kwargs))
105 class OscTestCase(unittest.TestCase):
107 osc.core.conf.get_config(override_conffile=os.path.join(self._get_fixtures_dir(), 'oscrc'))
108 self.tmpdir = tempfile.mkdtemp(prefix='osc_test')
109 shutil.copytree(os.path.join(self._get_fixtures_dir(), 'osctest'), os.path.join(self.tmpdir, 'osctest'))
110 global EXPECTED_REQUESTS
111 EXPECTED_REQUESTS = []
112 osc.core.conf._build_opener = lambda u: urllib2.build_opener(MyHTTPHandler(EXPECTED_REQUESTS, self._get_fixtures_dir()))
113 self.stdout = sys.stdout
114 sys.stdout = StringIO.StringIO()
117 self.assertTrue(len(EXPECTED_REQUESTS) == 0)
118 sys.stdout = self.stdout
120 shutil.rmtree(self.tmpdir)
124 def _get_fixtures_dir(self):
125 raise NotImplementedError('subclasses should implement this method')
127 def _change_to_pkg(self, name):
128 os.chdir(os.path.join(self.tmpdir, 'osctest', name))
130 def _check_list(self, fname, exp):
131 fname = os.path.join('.osc', fname)
132 self.assertTrue(os.path.exists(fname))
133 self.assertEqual(open(fname, 'r').read(), exp)
135 def _check_addlist(self, exp):
136 self._check_list('_to_be_added', exp)
138 def _check_deletelist(self, exp):
139 self._check_list('_to_be_deleted', exp)
141 def _check_conflictlist(self, exp):
142 self._check_list('_in_conflict', exp)
144 def _check_status(self, p, fname, exp):
145 self.assertEqual(p.status(fname), exp)
147 def _check_digests(self, fname, *skipfiles):
148 fname = os.path.join(self._get_fixtures_dir(), fname)
149 self.assertEqual(open(os.path.join('.osc', '_files'), 'r').read(), open(fname, 'r').read())
150 root = ET.parse(fname).getroot()
151 for i in root.findall('entry'):
152 if i.get('name') in skipfiles:
154 self.assertTrue(os.path.exists(os.path.join('.osc', i.get('name'))))
155 self.assertEqual(osc.core.dgst(os.path.join('.osc', i.get('name'))), i.get('md5'))