self.is_network_split=False
self.sync_all()
- # Returns txid if operation was a success or None
- def wait_and_assert_operationid_status(self, myopid, in_status='success', in_errormsg=None):
- print('waiting for async operation {}'.format(myopid))
- opids = []
- opids.append(myopid)
- timeout = 300
- status = None
- errormsg = None
- txid = None
- for x in xrange(1, timeout):
- results = self.nodes[0].z_getoperationresult(opids)
- if len(results)==0:
- time.sleep(1)
- else:
- status = results[0]["status"]
- if status == "failed":
- errormsg = results[0]['error']['message']
- elif status == "success":
- txid = results[0]['result']['txid']
- break
- print('...returned status: {}'.format(status))
- assert_equal(in_status, status)
- if errormsg is not None:
- assert(in_errormsg is not None)
- assert_equal(in_errormsg in errormsg, True)
- print('...returned error: {}'.format(errormsg))
- return txid
-
def run_test (self):
print "Mining blocks..."
from decimal import Decimal
from test_framework.test_framework import BitcoinTestFramework
-from test_framework.util import assert_equal, assert_greater_than, start_nodes, initialize_chain_clean, connect_nodes_bi
+from test_framework.util import assert_equal, assert_greater_than, start_nodes,\
+ initialize_chain_clean, connect_nodes_bi, wait_and_assert_operationid_status
import logging
import time
self.is_network_split=False
self.sync_all()
- # TODO: Refactor in z_addr test_framework file
- # Returns txid if operation was a success or None
- def wait_and_assert_operationid_status(self, node, myopid, in_status='success', in_errormsg=None):
- print('waiting for async operation {}'.format(myopid))
- opids = []
- opids.append(myopid)
- timeout = 300
- status = None
- errormsg = None
- txid = None
- for x in xrange(1, timeout):
- results = node.z_getoperationresult(opids)
- if len(results)==0:
- time.sleep(1)
- else:
- print("Results", results[0])
- status = results[0]["status"]
- if status == "failed":
- errormsg = results[0]['error']['message']
- elif status == "success":
- txid = results[0]['result']['txid']
- break
- print('...returned status: {}'.format(status))
- assert_equal(in_status, status)
- if errormsg is not None:
- assert(in_errormsg is not None)
- assert_equal(in_errormsg in errormsg, True)
- print('...returned error: {}'.format(errormsg))
- return txid
-
def run_test(self):
[alice, bob, charlie, david, miner] = self.nodes
def z_send(from_node, from_addr, to_addr, amount):
opid = from_node.z_sendmany(from_addr, [{"address": to_addr, "amount": Decimal(amount)}])
- self.wait_and_assert_operationid_status(from_node, opid)
+ wait_and_assert_operationid_status(from_node, opid)
self.sync_all()
miner.generate(1)
self.sync_all()
# Shield Alice's coinbase funds to her zaddr
alice_zaddr = alice.z_getnewaddress()
res = alice.z_shieldcoinbase("*", alice_zaddr)
- self.wait_and_assert_operationid_status(alice, res['opid'])
+ wait_and_assert_operationid_status(alice, res['opid'])
self.sync_all()
miner.generate(1)
self.sync_all()