Source code for bbclib.libs.bbclib_transaction

# -*- coding: utf-8 -*-
"""
Copyright (c) 2017 beyond-blockchain.org.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

    http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
import sys
import os
import platform
import binascii
import hashlib
import msgpack
import random
import time
import traceback
from collections import Mapping

current_dir = os.path.abspath(os.path.dirname(__file__))
sys.path.append(os.path.join(current_dir, "../.."))

from bbclib.libs import bbclib_utils
from bbclib.libs import bbclib_error
from bbclib.libs.bbclib_config import DEFAULT_CURVETYPE
from bbclib.libs.bbclib_keypair import KeyPair
from bbclib.libs.bbclib_signature import BBcSignature
from bbclib.libs.bbclib_relation import BBcRelation
from bbclib.libs.bbclib_reference import BBcReference
from bbclib.libs.bbclib_event import BBcEvent
from bbclib.libs.bbclib_witness import BBcWitness
from bbclib.libs.bbclib_crossref import BBcCrossRef
import bbclib
from bbclib import id_length_conf


[docs]class BBcTransaction: """Transaction object""" WITH_WIRE = False # for backward compatibility def __init__(self, version=1, unpack=None, id_length=None): if id_length is not None: bbclib.configure_id_length_all(id_length) self.version = version self.timestamp = int(time.time() * 1000) # milliseconds self.events = [] self.references = [] self.relations = [] self.witness = None self.cross_ref = None self.signatures = [] self.userid_sigidx_mapping = dict() self.transaction_id = None self.transaction_base_digest = None self.transaction_data = None self.asset_group_ids = dict() if unpack is not None: self.unpack(unpack) def __str__(self): ret = "------- Dump of the transaction data ------\n" ret += "* transaction_id: %s\n" % bbclib_utils.str_binary(self.transaction_id) ret += "version: %d\n" % self.version ret += "timestamp: %d\n" % self.timestamp if self.version != 0: ret += "id_length of transaction_id: %d\n" % id_length_conf["transaction_id"] ret += "Event[]: %d\n" % len(self.events) for i, evt in enumerate(self.events): ret += " [%d]\n" % i ret += str(evt) ret += "Reference[]: %d\n" % len(self.references) for i, refe in enumerate(self.references): ret += " [%d]\n" % i ret += str(refe) ret += "Relation[]: %d\n" % len(self.relations) for i, rtn in enumerate(self.relations): ret += " [%d]\n" % i ret += str(rtn) if self.witness is None: ret += "Witness: None\n" else: ret += str(self.witness) if self.cross_ref is None: ret += "Cross_Ref: None\n" else: ret += str(self.cross_ref) ret += "Signature[]: %d\n" % len(self.signatures) for i, sig in enumerate(self.signatures): ret += " [%d]\n" % i ret += str(sig) return ret
[docs] def add(self, event=None, reference=None, relation=None, witness=None, cross_ref=None): """Add parts""" if event is not None: if isinstance(event, list): self.events.extend(event) else: self.events.append(event) if reference is not None: if isinstance(reference, list): self.references.extend(reference) else: self.references.append(reference) if relation is not None: if isinstance(relation, list): self.relations.extend(relation) else: self.relations.append(relation) for rtn in self.relations: rtn.version = self.version if witness is not None: witness.transaction = self self.witness = witness if cross_ref is not None: self.cross_ref = cross_ref return True
[docs] def get_sig_index(self, user_id): """Reserve a space for signature for the specified user_id Args: user_id (bytes): user_id whose signature will be added to the signature part Returns: int: position (index) in the signature part """ if user_id not in self.userid_sigidx_mapping: self.userid_sigidx_mapping[user_id] = len(self.userid_sigidx_mapping) self.signatures.append(BBcSignature()) return self.userid_sigidx_mapping[user_id]
[docs] def set_sig_index(self, user_id, idx): """Map a user_id with the index of signature list Args: user_id (bytes): user_id whose signature will be added to the signature part idx (int): index number """ if user_id in self.userid_sigidx_mapping: return self.userid_sigidx_mapping[user_id] = idx
[docs] def add_signature(self, user_id=None, signature=None): """Add signature in the reserved space Args: user_id (bytes): user_id of the signature owner signature (BBcSignature): signature Returns: bool: True if successful """ if user_id not in self.userid_sigidx_mapping: return False idx = self.userid_sigidx_mapping[user_id] signature.not_initialized = False self.signatures[idx] = signature return True
[docs] def digest(self): """Calculate the digest The digest corresponds to the transaction_id of this object Returns: bytes: transaction_id (or digest) """ target = self.pack(for_id=True) d = hashlib.sha256(target).digest() self.transaction_id = d[:id_length_conf["transaction_id"]] return d
[docs] def pack(self, for_id=False): """Pack the whole parts""" dat = bytearray(bbclib_utils.to_4byte(self.version)) dat.extend(bbclib_utils.to_8byte(self.timestamp)) if self.version != 0: dat.extend(bbclib_utils.to_2byte(id_length_conf["transaction_id"])) dat.extend(bbclib_utils.to_2byte(len(self.events))) for i in range(len(self.events)): evt = self.events[i].pack() dat.extend(bbclib_utils.to_4byte(len(evt))) dat.extend(evt) dat.extend(bbclib_utils.to_2byte(len(self.references))) for i in range(len(self.references)): refe = self.references[i].pack() dat.extend(bbclib_utils.to_4byte(len(refe))) dat.extend(refe) dat.extend(bbclib_utils.to_2byte(len(self.relations))) for i in range(len(self.relations)): rtn = self.relations[i].pack() dat.extend(bbclib_utils.to_4byte(len(rtn))) dat.extend(rtn) if self.witness is not None: dat.extend(bbclib_utils.to_2byte(1)) witness = self.witness.pack() dat.extend(bbclib_utils.to_4byte(len(witness))) dat.extend(witness) else: dat.extend(bbclib_utils.to_2byte(0)) self.transaction_base_digest = hashlib.sha256(dat).digest() dat_cross = bytearray() if self.cross_ref is not None: cross = self.cross_ref.pack() dat_cross.extend(bbclib_utils.to_2byte(1)) dat_cross.extend(bbclib_utils.to_4byte(len(cross))) dat_cross.extend(cross) else: dat_cross.extend(bbclib_utils.to_2byte(0)) if for_id: dat_for_id = bytearray(self.transaction_base_digest) dat_for_id.extend(dat_cross) return bytes(dat_for_id) dat.extend(dat_cross) dat.extend(bbclib_utils.to_2byte(len(self.signatures))) for signature in self.signatures: sig = signature.pack() dat.extend(bbclib_utils.to_4byte(len(sig))) dat.extend(sig) self.transaction_data = bytes(dat) return self.transaction_data
[docs] def unpack(self, data): """Unpack into this object Args: data (bytes): packed binary data Returns: bool: True if successful """ self.transaction_data = data[:] ptr = 0 data_size = len(data) try: ptr, self.version = bbclib_utils.get_n_byte_int(ptr, 4, data) ptr, self.timestamp = bbclib_utils.get_n_byte_int(ptr, 8, data) if self.version != 0: ptr, id_length = bbclib_utils.get_n_byte_int(ptr, 2, data) id_length_conf["transaction_id"] = id_length ptr, evt_num = bbclib_utils.get_n_byte_int(ptr, 2, data) self.events = [] for i in range(evt_num): ptr, size = bbclib_utils.get_n_byte_int(ptr, 4, data) ptr, evtdata = bbclib_utils.get_n_bytes(ptr, size, data) evt = BBcEvent() if not evt.unpack(evtdata): return False self.events.append(evt) if ptr >= data_size: return False self.asset_group_ids[evt.asset.asset_id] = evt.asset_group_id ptr, ref_num = bbclib_utils.get_n_byte_int(ptr, 2, data) self.references = [] for i in range(ref_num): ptr, size = bbclib_utils.get_n_byte_int(ptr, 4, data) ptr, refdata = bbclib_utils.get_n_bytes(ptr, size, data) refe = BBcReference(None, self) if not refe.unpack(refdata): return False self.references.append(refe) if ptr >= data_size: return False ptr, rtn_num = bbclib_utils.get_n_byte_int(ptr, 2, data) self.relations = [] for i in range(rtn_num): ptr, size = bbclib_utils.get_n_byte_int(ptr, 4, data) ptr, rtndata = bbclib_utils.get_n_bytes(ptr, size, data) rtn = BBcRelation() if not rtn.unpack(rtndata, self.version): return False self.relations.append(rtn) if ptr >= data_size: return False if rtn.asset is not None: self.asset_group_ids[rtn.asset.asset_id] = rtn.asset_group_id if rtn.asset_raw is not None: self.asset_group_ids[rtn.asset_raw.asset_id] = rtn.asset_group_id if rtn.asset_hash is not None: for h in rtn.asset_hash.asset_ids: self.asset_group_ids[h] = rtn.asset_group_id ptr, witness_num = bbclib_utils.get_n_byte_int(ptr, 2, data) if witness_num == 0: self.witness = None else: ptr, size = bbclib_utils.get_n_byte_int(ptr, 4, data) ptr, witnessdata = bbclib_utils.get_n_bytes(ptr, size, data) self.witness = BBcWitness() self.witness.transaction = self if not self.witness.unpack(witnessdata): return False ptr, cross_num = bbclib_utils.get_n_byte_int(ptr, 2, data) if cross_num == 0: self.cross_ref = None else: ptr, size = bbclib_utils.get_n_byte_int(ptr, 4, data) ptr, crossdata = bbclib_utils.get_n_bytes(ptr, size, data) self.cross_ref = BBcCrossRef() if not self.cross_ref.unpack(crossdata): return False ptr, sig_num = bbclib_utils.get_n_byte_int(ptr, 2, data) self.signatures = [] for i in range(sig_num): ptr, size = bbclib_utils.get_n_byte_int(ptr, 4, data) sig = BBcSignature() if size > 4: ptr, sigdata = bbclib_utils.get_n_bytes(ptr, size, data) if not sig.unpack(sigdata): return False self.signatures.append(sig) if ptr > data_size: return False self.digest() except Exception as e: print("Transaction data unpack: %s" % e) print(traceback.format_exc()) return False return True
[docs] def sign(self, key_type=DEFAULT_CURVETYPE, private_key=None, public_key=None, keypair=None, no_pubkey=False): """Sign the transaction Args: key_type (int): Type of encryption key's curve private_key (bytes): public_key (bytes): keypair (KeyPair): keypair or set of private_key and public_key needs to be given no_pubkey (bool): If True, public key is not contained in the BBcSignature object (needs to be given externally when verification) Returns: BBcSignature: """ bbclib._reset_error() if keypair is None: if len(private_key) != 32 or len(public_key) <= 32: bbclib._set_error(code=bbclib_error.EBADKEYPAIR, txt="Bad private_key/public_key (must be in bytes format)") return None keypair = KeyPair(curvetype=key_type, privkey=private_key, pubkey=public_key) if keypair is None: bbclib._set_error(code=bbclib_error.EBADKEYPAIR, txt="Bad private_key/public_key") return None sig = BBcSignature(key_type=keypair.curvetype) s = keypair.sign(self.digest()) if s is None: bbclib._set_error(code=bbclib_error.EOTHER, txt="sig_type %d is not supported" % keypair.curvetype) return None if no_pubkey: sig.add(signature=s) else: sig.add(signature=s, pubkey=keypair.public_key) return sig