Dash Core  0.12.2.1
P2P Digital Currency
comptool.py
Go to the documentation of this file.
1 #!/usr/bin/env python2
2 #
3 # Distributed under the MIT/X11 software license, see the accompanying
4 # file COPYING or http://www.opensource.org/licenses/mit-license.php.
5 #
6 
7 from .mininode import *
8 from .blockstore import BlockStore, TxStore
9 from .util import p2p_port
10 
11 '''
12 This is a tool for comparing two or more dashds to each other
13 using a script provided.
14 
15 To use, create a class that implements get_tests(), and pass it in
16 as the test generator to TestManager. get_tests() should be a python
17 generator that returns TestInstance objects. See below for definition.
18 '''
19 
20 # TestNode behaves as follows:
21 # Configure with a BlockStore and TxStore
22 # on_inv: log the message but don't request
23 # on_headers: log the chain tip
24 # on_pong: update ping response map (for synchronization)
25 # on_getheaders: provide headers via BlockStore
26 # on_getdata: provide blocks via BlockStore
27 
28 global mininode_lock
29 
30 class RejectResult(object):
31  '''
32  Outcome that expects rejection of a transaction or block.
33  '''
34  def __init__(self, code, reason=b''):
35  self.code = code
36  self.reason = reason
37  def match(self, other):
38  if self.code != other.code:
39  return False
40  return other.reason.startswith(self.reason)
41  def __repr__(self):
42  return '%i:%s' % (self.code,self.reason or '*')
43 
45 
46  def __init__(self, block_store, tx_store):
47  NodeConnCB.__init__(self)
48  self.conn = None
49  self.bestblockhash = None
50  self.block_store = block_store
52  self.tx_store = tx_store
53  self.tx_request_map = {}
54  self.block_reject_map = {}
55  self.tx_reject_map = {}
56 
57  # When the pingmap is non-empty we're waiting for
58  # a response
59  self.pingMap = {}
60  self.lastInv = []
61  self.closed = False
62 
63  def on_close(self, conn):
64  self.closed = True
65 
66  def add_connection(self, conn):
67  self.conn = conn
68 
69  def on_headers(self, conn, message):
70  if len(message.headers) > 0:
71  best_header = message.headers[-1]
72  best_header.calc_sha256()
73  self.bestblockhash = best_header.sha256
74 
75  def on_getheaders(self, conn, message):
76  response = self.block_store.headers_for(message.locator, message.hashstop)
77  if response is not None:
78  conn.send_message(response)
79 
80  def on_getdata(self, conn, message):
81  [conn.send_message(r) for r in self.block_store.get_blocks(message.inv)]
82  [conn.send_message(r) for r in self.tx_store.get_transactions(message.inv)]
83 
84  for i in message.inv:
85  if i.type == 1:
86  self.tx_request_map[i.hash] = True
87  elif i.type == 2:
88  self.block_request_map[i.hash] = True
89 
90  def on_inv(self, conn, message):
91  self.lastInv = [x.hash for x in message.inv]
92 
93  def on_pong(self, conn, message):
94  try:
95  del self.pingMap[message.nonce]
96  except KeyError:
97  raise AssertionError("Got pong for unknown ping [%s]" % repr(message))
98 
99  def on_reject(self, conn, message):
100  if message.message == b'tx':
101  self.tx_reject_map[message.data] = RejectResult(message.code, message.reason)
102  if message.message == b'block':
103  self.block_reject_map[message.data] = RejectResult(message.code, message.reason)
104 
105  def send_inv(self, obj):
106  mtype = 2 if isinstance(obj, CBlock) else 1
107  self.conn.send_message(msg_inv([CInv(mtype, obj.sha256)]))
108 
109  def send_getheaders(self):
110  # We ask for headers from their last tip.
111  m = msg_getheaders()
112  m.locator = self.block_store.get_locator(self.bestblockhash)
113  self.conn.send_message(m)
114 
115  # This assumes BIP31
116  def send_ping(self, nonce):
117  self.pingMap[nonce] = True
118  self.conn.send_message(msg_ping(nonce))
119 
120  def received_ping_response(self, nonce):
121  return nonce not in self.pingMap
122 
123  def send_mempool(self):
124  self.lastInv = []
125  self.conn.send_message(msg_mempool())
126 
127 # TestInstance:
128 #
129 # Instances of these are generated by the test generator, and fed into the
130 # comptool.
131 #
132 # "blocks_and_transactions" should be an array of
133 # [obj, True/False/None, hash/None]:
134 # - obj is either a CBlock, CBlockHeader, or a CTransaction, and
135 # - the second value indicates whether the object should be accepted
136 # into the blockchain or mempool (for tests where we expect a certain
137 # answer), or "None" if we don't expect a certain answer and are just
138 # comparing the behavior of the nodes being tested.
139 # - the third value is the hash to test the tip against (if None or omitted,
140 # use the hash of the block)
141 # - NOTE: if a block header, no test is performed; instead the header is
142 # just added to the block_store. This is to facilitate block delivery
143 # when communicating with headers-first clients (when withholding an
144 # intermediate block).
145 # sync_every_block: if True, then each block will be inv'ed, synced, and
146 # nodes will be tested based on the outcome for the block. If False,
147 # then inv's accumulate until all blocks are processed (or max inv size
148 # is reached) and then sent out in one inv message. Then the final block
149 # will be synced across all connections, and the outcome of the final
150 # block will be tested.
151 # sync_every_tx: analogous to behavior for sync_every_block, except if outcome
152 # on the final tx is None, then contents of entire mempool are compared
153 # across all connections. (If outcome of final tx is specified as true
154 # or false, then only the last tx is tested against outcome.)
155 
156 class TestInstance(object):
157  def __init__(self, objects=None, sync_every_block=True, sync_every_tx=False):
158  self.blocks_and_transactions = objects if objects else []
159  self.sync_every_block = sync_every_block
160  self.sync_every_tx = sync_every_tx
161 
162 class TestManager(object):
163 
164  def __init__(self, testgen, datadir):
165  self.test_generator = testgen
166  self.connections = []
167  self.test_nodes = []
168  self.block_store = BlockStore(datadir)
169  self.tx_store = TxStore(datadir)
170  self.ping_counter = 1
171 
172  def add_all_connections(self, nodes):
173  for i in range(len(nodes)):
174  # Create a p2p connection to each node
175  test_node = TestNode(self.block_store, self.tx_store)
176  self.test_nodes.append(test_node)
177  self.connections.append(NodeConn('127.0.0.1', p2p_port(i), nodes[i], test_node))
178  # Make sure the TestNode (callback class) has a reference to its
179  # associated NodeConn
180  test_node.add_connection(self.connections[-1])
181 
183  self.connections = []
184  self.test_nodes = []
185 
187  def disconnected():
188  return all(node.closed for node in self.test_nodes)
189  return wait_until(disconnected, timeout=10)
190 
191  def wait_for_verack(self):
192  def veracked():
193  return all(node.verack_received for node in self.test_nodes)
194  return wait_until(veracked, timeout=10)
195 
196  def wait_for_pings(self, counter):
197  def received_pongs():
198  return all(node.received_ping_response(counter) for node in self.test_nodes)
199  return wait_until(received_pongs)
200 
201  # sync_blocks: Wait for all connections to request the blockhash given
202  # then send get_headers to find out the tip of each node, and synchronize
203  # the response by using a ping (and waiting for pong with same nonce).
204  def sync_blocks(self, blockhash, num_blocks):
205  def blocks_requested():
206  return all(
207  blockhash in node.block_request_map and node.block_request_map[blockhash]
208  for node in self.test_nodes
209  )
210 
211  # --> error if not requested
212  if not wait_until(blocks_requested, attempts=20*num_blocks):
213  # print [ c.cb.block_request_map for c in self.connections ]
214  raise AssertionError("Not all nodes requested block")
215 
216  # Send getheaders message
217  [ c.cb.send_getheaders() for c in self.connections ]
218 
219  # Send ping and wait for response -- synchronization hack
220  [ c.cb.send_ping(self.ping_counter) for c in self.connections ]
221  self.wait_for_pings(self.ping_counter)
222  self.ping_counter += 1
223 
224  # Analogous to sync_block (see above)
225  def sync_transaction(self, txhash, num_events):
226  # Wait for nodes to request transaction (50ms sleep * 20 tries * num_events)
227  def transaction_requested():
228  return all(
229  txhash in node.tx_request_map and node.tx_request_map[txhash]
230  for node in self.test_nodes
231  )
232 
233  # --> error if not requested
234  if not wait_until(transaction_requested, attempts=20*num_events):
235  # print [ c.cb.tx_request_map for c in self.connections ]
236  raise AssertionError("Not all nodes requested transaction")
237 
238  # Get the mempool
239  [ c.cb.send_mempool() for c in self.connections ]
240 
241  # Send ping and wait for response -- synchronization hack
242  [ c.cb.send_ping(self.ping_counter) for c in self.connections ]
243  self.wait_for_pings(self.ping_counter)
244  self.ping_counter += 1
245 
246  # Sort inv responses from each node
247  with mininode_lock:
248  [ c.cb.lastInv.sort() for c in self.connections ]
249 
250  # Verify that the tip of each connection all agree with each other, and
251  # with the expected outcome (if given)
252  def check_results(self, blockhash, outcome):
253  with mininode_lock:
254  for c in self.connections:
255  if outcome is None:
256  if c.cb.bestblockhash != self.connections[0].cb.bestblockhash:
257  return False
258  elif isinstance(outcome, RejectResult): # Check that block was rejected w/ code
259  if c.cb.bestblockhash == blockhash:
260  return False
261  if blockhash not in c.cb.block_reject_map:
262  print 'Block not in reject map: %064x' % (blockhash)
263  return False
264  if not outcome.match(c.cb.block_reject_map[blockhash]):
265  print 'Block rejected with %s instead of expected %s: %064x' % (c.cb.block_reject_map[blockhash], outcome, blockhash)
266  return False
267  elif ((c.cb.bestblockhash == blockhash) != outcome):
268  # print c.cb.bestblockhash, blockhash, outcome
269  return False
270  return True
271 
272  # Either check that the mempools all agree with each other, or that
273  # txhash's presence in the mempool matches the outcome specified.
274  # This is somewhat of a strange comparison, in that we're either comparing
275  # a particular tx to an outcome, or the entire mempools altogether;
276  # perhaps it would be useful to add the ability to check explicitly that
277  # a particular tx's existence in the mempool is the same across all nodes.
278  def check_mempool(self, txhash, outcome):
279  with mininode_lock:
280  for c in self.connections:
281  if outcome is None:
282  # Make sure the mempools agree with each other
283  if c.cb.lastInv != self.connections[0].cb.lastInv:
284  # print c.rpc.getrawmempool()
285  return False
286  elif isinstance(outcome, RejectResult): # Check that tx was rejected w/ code
287  if txhash in c.cb.lastInv:
288  return False
289  if txhash not in c.cb.tx_reject_map:
290  print 'Tx not in reject map: %064x' % (txhash)
291  return False
292  if not outcome.match(c.cb.tx_reject_map[txhash]):
293  print 'Tx rejected with %s instead of expected %s: %064x' % (c.cb.tx_reject_map[txhash], outcome, txhash)
294  return False
295  elif ((txhash in c.cb.lastInv) != outcome):
296  # print c.rpc.getrawmempool(), c.cb.lastInv
297  return False
298  return True
299 
300  def run(self):
301  # Wait until verack is received
302  self.wait_for_verack()
303 
304  test_number = 1
305  for test_instance in self.test_generator.get_tests():
306  # We use these variables to keep track of the last block
307  # and last transaction in the tests, which are used
308  # if we're not syncing on every block or every tx.
309  [ block, block_outcome, tip ] = [ None, None, None ]
310  [ tx, tx_outcome ] = [ None, None ]
311  invqueue = []
312 
313  for test_obj in test_instance.blocks_and_transactions:
314  b_or_t = test_obj[0]
315  outcome = test_obj[1]
316  # Determine if we're dealing with a block or tx
317  if isinstance(b_or_t, CBlock): # Block test runner
318  block = b_or_t
319  block_outcome = outcome
320  tip = block.sha256
321  # each test_obj can have an optional third argument
322  # to specify the tip we should compare with
323  # (default is to use the block being tested)
324  if len(test_obj) >= 3:
325  tip = test_obj[2]
326 
327  # Add to shared block_store, set as current block
328  # If there was an open getdata request for the block
329  # previously, and we didn't have an entry in the
330  # block_store, then immediately deliver, because the
331  # node wouldn't send another getdata request while
332  # the earlier one is outstanding.
333  first_block_with_hash = True
334  if self.block_store.get(block.sha256) is not None:
335  first_block_with_hash = False
336  with mininode_lock:
337  self.block_store.add_block(block)
338  for c in self.connections:
339  if first_block_with_hash and block.sha256 in c.cb.block_request_map and c.cb.block_request_map[block.sha256] == True:
340  # There was a previous request for this block hash
341  # Most likely, we delivered a header for this block
342  # but never had the block to respond to the getdata
343  c.send_message(msg_block(block))
344  else:
345  c.cb.block_request_map[block.sha256] = False
346  # Either send inv's to each node and sync, or add
347  # to invqueue for later inv'ing.
348  if (test_instance.sync_every_block):
349  [ c.cb.send_inv(block) for c in self.connections ]
350  self.sync_blocks(block.sha256, 1)
351  if (not self.check_results(tip, outcome)):
352  raise AssertionError("Test failed at test %d" % test_number)
353  else:
354  invqueue.append(CInv(2, block.sha256))
355  elif isinstance(b_or_t, CBlockHeader):
356  block_header = b_or_t
357  self.block_store.add_header(block_header)
358  else: # Tx test runner
359  assert(isinstance(b_or_t, CTransaction))
360  tx = b_or_t
361  tx_outcome = outcome
362  # Add to shared tx store and clear map entry
363  with mininode_lock:
364  self.tx_store.add_transaction(tx)
365  for c in self.connections:
366  c.cb.tx_request_map[tx.sha256] = False
367  # Again, either inv to all nodes or save for later
368  if (test_instance.sync_every_tx):
369  [ c.cb.send_inv(tx) for c in self.connections ]
370  self.sync_transaction(tx.sha256, 1)
371  if (not self.check_mempool(tx.sha256, outcome)):
372  raise AssertionError("Test failed at test %d" % test_number)
373  else:
374  invqueue.append(CInv(1, tx.sha256))
375  # Ensure we're not overflowing the inv queue
376  if len(invqueue) == MAX_INV_SZ:
377  [ c.send_message(msg_inv(invqueue)) for c in self.connections ]
378  invqueue = []
379 
380  # Do final sync if we weren't syncing on every block or every tx.
381  if (not test_instance.sync_every_block and block is not None):
382  if len(invqueue) > 0:
383  [ c.send_message(msg_inv(invqueue)) for c in self.connections ]
384  invqueue = []
385  self.sync_blocks(block.sha256, len(test_instance.blocks_and_transactions))
386  if (not self.check_results(tip, block_outcome)):
387  raise AssertionError("Block test failed at test %d" % test_number)
388  if (not test_instance.sync_every_tx and tx is not None):
389  if len(invqueue) > 0:
390  [ c.send_message(msg_inv(invqueue)) for c in self.connections ]
391  invqueue = []
392  self.sync_transaction(tx.sha256, len(test_instance.blocks_and_transactions))
393  if (not self.check_mempool(tx.sha256, tx_outcome)):
394  raise AssertionError("Mempool test failed at test %d" % test_number)
395 
396  print "Test %d: PASS" % test_number, [ c.rpc.getblockcount() for c in self.connections ]
397  test_number += 1
398 
399  [ c.disconnect_node() for c in self.connections ]
401  self.block_store.close()
402  self.tx_store.close()
def __init__(self, testgen, datadir)
Definition: comptool.py:164
def __init__(self, objects=None, sync_every_block=True, sync_every_tx=False)
Definition: comptool.py:157
def on_pong(self, conn, message)
Definition: comptool.py:93
def __init__(self, code, reason=b'')
Definition: comptool.py:34
def send_ping(self, nonce)
Definition: comptool.py:116
def wait_until(predicate, attempts=float('inf'), timeout=float('inf'))
Definition: mininode.py:1020
def sync_transaction(self, txhash, num_events)
Definition: comptool.py:225
def add_all_connections(self, nodes)
Definition: comptool.py:172
def on_reject(self, conn, message)
Definition: comptool.py:99
def on_headers(self, conn, message)
Definition: comptool.py:69
def check_results(self, blockhash, outcome)
Definition: comptool.py:252
def wait_for_pings(self, counter)
Definition: comptool.py:196
def on_inv(self, conn, message)
Definition: comptool.py:90
def on_getheaders(self, conn, message)
Definition: comptool.py:75
def add_connection(self, conn)
Definition: comptool.py:66
def sync_blocks(self, blockhash, num_blocks)
Definition: comptool.py:204
def check_mempool(self, txhash, outcome)
Definition: comptool.py:278
def on_getdata(self, conn, message)
Definition: comptool.py:80
def p2p_port(n)
Definition: util.py:93
def received_ping_response(self, nonce)
Definition: comptool.py:120
def __init__(self, block_store, tx_store)
Definition: comptool.py:46