《区块链编程》第五章

# 交易

## 交易的组成

1. 版本号
2. 输入
i. 父交易的id
ii. 父交易output的序号
iii. 签名脚本
iv. 序列号
3. 输出
i. amount
ii. 公钥脚本
4. 时间锁

## 练习1

p88

### 代码实现

``````from helper import little_endian_to_int
from io import BytesIO

class Tx:

def __init__(self, version, tx_ins, tx_outs, locktime, testnet=False):
self.version = version
self.tx_ins = tx_ins  # <1>
self.tx_outs = tx_outs
self.locktime = locktime
self.testnet = testnet  # <2>

def __repr__(self):
tx_ins = ''
for tx_in in self.tx_ins:
tx_ins += tx_in.__repr__() + 'n'
tx_outs = ''
for tx_out in self.tx_outs:
tx_outs += tx_out.__repr__() + 'n'
return 'tx: {}nversion: {}ntx_ins:n{}tx_outs:n{}locktime: {}'.format(
self.id(),
self.version,
tx_ins,
tx_outs,
self.locktime,
)

def id(self):  # <3>
return self.hash().hex()

def hash(self):  # <4>
'''Binary hash of the legacy serialization'''
return hash256(self.serialize())[::-1]
# end::source1[]

@classmethod
def parse(cls, s, testnet=False):
'''Takes a byte stream and parses the transaction at the start
return a Tx object
'''
return cls(version, None, None, None, testnet=testnet)
if __name__ == '__main__':
tx = '01000000'
stream = BytesIO(bytes.fromhex(tx))
print(Tx.parse(stream).version)
``````

### 测试

``````1
[Finished in 306ms]

``````

## 练习2

p92

### 代码实现

``````# -*- coding: utf-8 -*-
# @Author: 从化北(喵星人)
# @Date:   2022-01-04 15:38:18
from io import BytesIO
from script import Script

class Tx:

def __init__(self, version, tx_ins, tx_outs, locktime, testnet=False):
self.version = version
self.tx_ins = tx_ins  #
self.tx_outs = tx_outs
self.locktime = locktime
self.testnet = testnet  #

@classmethod
def parse(cls, s, testnet=False):
'''Takes a byte stream and parses the transaction at the start
return a Tx object
'''
inputs = []
for _ in range(num_inputs):
inputs.append(TxIn.parse(s))
return cls(version, inputs, None, None, testnet=testnet)

class TxIn:
def __init__(self, prev_tx, prev_index, script_sig=None, sequence=0xffffffff):
self.prev_tx = prev_tx
self.prev_index = prev_index
if script_sig is None:  # <1>
self.script_sig = Script()
else:
self.script_sig = script_sig
self.sequence = sequence

def __repr__(self):
return '{}:{}'.format(
self.prev_tx.hex(),
self.prev_index,
)
# end::source2[]

@classmethod
def parse(cls, s):
'''Takes a byte stream and parses the tx_input at the start
return a TxIn object
'''
script_sig = Script.parse(s)
return cls(prev_tx, prev_index, script_sig, sequence)

if __name__ == '__main__':
stream = BytesIO(raw_tx)
tx = Tx.parse(stream)
print(tx.tx_ins)
``````

### 测试

``````[d1c789a9c60383bf715f3f6ad9d14b91fe55f3deb369fe5d9280cb1a01793f81:0]
[Finished in 309ms]
``````

## 练习3

p94

### 代码实现

``````# -*- coding: utf-8 -*-
# @Author: 从化北(喵星人)
# @Date:   2022-01-04 16:58:44
from io import BytesIO
from script import Script

class Tx:

def __init__(self, version, tx_ins, tx_outs, locktime, testnet=False):
self.version = version
self.tx_ins = tx_ins  #
self.tx_outs = tx_outs
self.locktime = locktime
self.testnet = testnet  #

@classmethod
def parse(cls, s, testnet=False):
'''Takes a byte stream and parses the transaction at the start
return a Tx object
'''
inputs = []
for _ in range(num_inputs):
inputs.append(TxIn.parse(s))
outputs = []
for _ in range(num_outputs):
outputs.append(TxOut.parse(s))
return cls(version, inputs, outputs, None, testnet=testnet)

class TxIn:
def __init__(self, prev_tx, prev_index, script_sig=None, sequence=0xffffffff):
self.prev_tx = prev_tx
self.prev_index = prev_index
if script_sig is None:  # <1>
self.script_sig = Script()
else:
self.script_sig = script_sig
self.sequence = sequence

def __repr__(self):
return '{}:{}'.format(
self.prev_tx.hex(),
self.prev_index,
)
# end::source2[]

@classmethod
def parse(cls, s):
'''Takes a byte stream and parses the tx_input at the start
return a TxIn object
'''
script_sig = Script.parse(s)
return cls(prev_tx, prev_index, script_sig, sequence)

class TxOut:

def __init__(self, amount, script_pubkey):
self.amount = amount
self.script_pubkey = script_pubkey

def __repr__(self):
return '{}:{}'.format(self.amount, self.script_pubkey)
# end::source3[]

@classmethod
def parse(cls, s):
'''Takes a byte stream and parses the tx_output at the start
return a TxOut object
'''
script_pubkey = Script.parse(s)
return cls(amount, script_pubkey)
# amount is an integer in 8 bytes, little endian
# use Script.parse to get the ScriptPubKey
# return an instance of the class (see __init__ for args)
# raise NotImplementedError

# tag::source4[]
def serialize(self):  # <1>
'''Returns the byte serialization of the transaction output'''
result = int_to_little_endian(self.amount, 8)
result += self.script_pubkey.serialize()
return result
# end::source4[]

if __name__ == '__main__':
stream = BytesIO(raw_tx)
tx = Tx.parse(stream)
# print(tx.tx_ins)
print(tx.tx_outs)

``````

### 测试

``````[32454049:OP_DUP OP_HASH160 bc3b654dca7e56b04dca18f2566cdaf02e8d9ada OP_EQUALVERIFY OP_CHECKSIG, 10011545:OP_DUP OP_HASH160 1c4bc762dd5423e332166702cb75f40df79fea12 OP_EQUALVERIFY OP_CHECKSIG]
[Finished in 319ms]
``````

## 练习4

p95

### 代码实现

``````# -*- coding: utf-8 -*-
# @Author: 从化北(喵星人)
# @Date:   2022-01-04 17:04:41
from io import BytesIO
from script import Script

class Tx:

def __init__(self, version, tx_ins, tx_outs, locktime, testnet=False):
self.version = version
self.tx_ins = tx_ins  #
self.tx_outs = tx_outs
self.locktime = locktime
self.testnet = testnet  #

@classmethod
def parse(cls, s, testnet=False):
'''Takes a byte stream and parses the transaction at the start
return a Tx object
'''
inputs = []
for _ in range(num_inputs):
inputs.append(TxIn.parse(s))
outputs = []
for _ in range(num_outputs):
outputs.append(TxOut.parse(s))
return cls(version, inputs, outputs, locktime, testnet=testnet)

class TxIn:
def __init__(self, prev_tx, prev_index, script_sig=None, sequence=0xffffffff):
self.prev_tx = prev_tx
self.prev_index = prev_index
if script_sig is None:  # <1>
self.script_sig = Script()
else:
self.script_sig = script_sig
self.sequence = sequence

def __repr__(self):
return '{}:{}'.format(
self.prev_tx.hex(),
self.prev_index,
)
# end::source2[]

@classmethod
def parse(cls, s):
'''Takes a byte stream and parses the tx_input at the start
return a TxIn object
'''
script_sig = Script.parse(s)
return cls(prev_tx, prev_index, script_sig, sequence)

class TxOut:

def __init__(self, amount, script_pubkey):
self.amount = amount
self.script_pubkey = script_pubkey

def __repr__(self):
return '{}:{}'.format(self.amount, self.script_pubkey)
# end::source3[]

@classmethod
def parse(cls, s):
'''Takes a byte stream and parses the tx_output at the start
return a TxOut object
'''
script_pubkey = Script.parse(s)
return cls(amount, script_pubkey)
# amount is an integer in 8 bytes, little endian
# use Script.parse to get the ScriptPubKey
# return an instance of the class (see __init__ for args)
# raise NotImplementedError

# tag::source4[]
def serialize(self):  # <1>
'''Returns the byte serialization of the transaction output'''
result = int_to_little_endian(self.amount, 8)
result += self.script_pubkey.serialize()
return result
# end::source4[]

if __name__ == '__main__':
stream = BytesIO(raw_tx)
tx = Tx.parse(stream)
# print(tx.tx_ins)
print(tx.locktime)

``````

### 运行结果

``````410393
[Finished in 320ms]
``````

## 练习5

p95

### 代码实现

``````# -*- coding: utf-8 -*-
# @Author: 从化北(喵星人)
# @Date:   2022-01-04 17:06:15
from io import BytesIO
from script import Script

class Tx:

def __init__(self, version, tx_ins, tx_outs, locktime, testnet=False):
self.version = version
self.tx_ins = tx_ins  #
self.tx_outs = tx_outs
self.locktime = locktime
self.testnet = testnet  #

@classmethod
def parse(cls, s, testnet=False):
'''Takes a byte stream and parses the transaction at the start
return a Tx object
'''
inputs = []
for _ in range(num_inputs):
inputs.append(TxIn.parse(s))
outputs = []
for _ in range(num_outputs):
outputs.append(TxOut.parse(s))
return cls(version, inputs, outputs, locktime, testnet=testnet)

class TxIn:
def __init__(self, prev_tx, prev_index, script_sig=None, sequence=0xffffffff):
self.prev_tx = prev_tx
self.prev_index = prev_index
if script_sig is None:  # <1>
self.script_sig = Script()
else:
self.script_sig = script_sig
self.sequence = sequence

def __repr__(self):
return '{}:{}'.format(
self.prev_tx.hex(),
self.prev_index,
)
# end::source2[]

@classmethod
def parse(cls, s):
'''Takes a byte stream and parses the tx_input at the start
return a TxIn object
'''
script_sig = Script.parse(s)
return cls(prev_tx, prev_index, script_sig, sequence)

class TxOut:

def __init__(self, amount, script_pubkey):
self.amount = amount
self.script_pubkey = script_pubkey

def __repr__(self):
return '{}:{}'.format(self.amount, self.script_pubkey)
# end::source3[]

@classmethod
def parse(cls, s):
'''Takes a byte stream and parses the tx_output at the start
return a TxOut object
'''
script_pubkey = Script.parse(s)
return cls(amount, script_pubkey)
# amount is an integer in 8 bytes, little endian
# use Script.parse to get the ScriptPubKey
# return an instance of the class (see __init__ for args)
# raise NotImplementedError

# tag::source4[]
def serialize(self):  # <1>
'''Returns the byte serialization of the transaction output'''
result = int_to_little_endian(self.amount, 8)
result += self.script_pubkey.serialize()
return result
# end::source4[]

if __name__ == '__main__':
148a1fc61c6ed7a069e010000006a47304402204585bcdef85e6b1c6af5c2669d4830ff86e42dd
a13a7b71aa8180f012102f0da57e85eec2934a82a585ea337ce2f4998b50ae699dd79f5880e253
dafafb7feffffffeb8f51f4038dc17e6313cf831d4f02281c2a468bde0fafd37f1bf882729e7fd
3000000006a47304402207899531a52d59a6de200179928ca900254a36b8dff8bb75f5f5d71b1c
dc26125022008b422690b8461cb52c3cc30330b23d574351872b7c361e9aae3649071c1a716012
1035d5c93d9ac96881f19ba1f686f15f009ded7c62efe85a872e6a19b43c15a2937feffffff567
bf40595119d1bb8a3037c356efd56170b64cbcc160fb028fa10704b45d775000000006a4730440
172b7989aec8850aa0dae49abfb84c81ae6e5b251a58ace5cfeffffffd63a5e6c16e620f86f375
852028751635dcee2be669c2a1686a4b5edf304012103ffd6f4a67e94aba353a00882e563ff272
b6dbf67d4750b0a56244948a87988ac005a6202000000001976a9143c82d7df364eb6c75be8c80
df2b3eda8db57397088ac46430600')
stream = BytesIO(raw_tx)
tx = Tx.parse(stream)
# print(tx.tx_ins)
print("第二个input的签名脚本：")
print(tx.tx_ins[1].script_sig)
print("第一个output的公钥脚本：")
print(tx.tx_outs[0].script_pubkey)
print("第二个output的数量：")
print(tx.tx_outs[1].amount)

``````

### 运行结果

``````第二个input的签名脚本：
304402207899531a52d59a6de200179928ca900254a36b8dff8bb75f5f5d71b1cdc26125022008b422690b8461cb52c3cc30330b23d574351872b7c361e9aae3649071c1a71601 035d5c93d9ac96881f19ba1f686f15f009ded7c62efe85a872e6a19b43c15a2937

OP_DUP OP_HASH160 ab0c0b2e98b1ab6dbf67d4750b0a56244948a879 OP_EQUALVERIFY OP_CHECKSIG

40000000
[Finished in 319ms]
``````

## 练习6

p98

### 代码实现

``````# -*- coding: utf-8 -*-
# @Author: 从化北(喵星人)
# @Date:   2022-01-04 17:12:19

class Tx:
...
def fee(self, testnet=False):
input_sum, output_sum = 0, 0
for tx_in in self.tx_ins:
input_sum += tx_in.value(testnet=testnet)
for tx_out in self.tx_outs:
output_sum += tx_out.amount
return input_sum - output_sum

``````

### 运行结果

``````无
``````

## 本章中涉及的tx完整代码

### 书中给的tx

``````from io import BytesIO
from unittest import TestCase

import json
import requests

from helper import (
encode_varint,
hash256,
int_to_little_endian,
little_endian_to_int,
run,
)
from script import Script

# tag::source7[]
class TxFetcher:
cache = {}

@classmethod
def get_url(cls, testnet=False):
if testnet:
return 'https://blockstream.info/testnet/api/'
else:
return 'https://blockstream.info/api/'

@classmethod
def fetch(cls, tx_id, testnet=False, fresh=False):
if fresh or (tx_id not in cls.cache):
url = '{}/tx/{}/hex'.format(cls.get_url(testnet), tx_id)
response = requests.get(url)
try:
raw = bytes.fromhex(response.text.strip())
except ValueError:
raise ValueError('unexpected response: {}'.format(response.text))
if raw[4] == 0:
raw = raw[:4] + raw[6:]
tx = Tx.parse(BytesIO(raw), testnet=testnet)
tx.locktime = little_endian_to_int(raw[-4:])
else:
tx = Tx.parse(BytesIO(raw), testnet=testnet)
if tx.id() != tx_id:  # <1>
raise ValueError('not the same id: {} vs {}'.format(tx.id(),
tx_id))
cls.cache[tx_id] = tx
cls.cache[tx_id].testnet = testnet
return cls.cache[tx_id]
# end::source7[]

@classmethod
for k, raw_hex in disk_cache.items():
raw = bytes.fromhex(raw_hex)
if raw[4] == 0:
raw = raw[:4] + raw[6:]
tx = Tx.parse(BytesIO(raw))
tx.locktime = little_endian_to_int(raw[-4:])
else:
tx = Tx.parse(BytesIO(raw))
cls.cache[k] = tx

@classmethod
def dump_cache(cls, filename):
with open(filename, 'w') as f:
to_dump = {k: tx.serialize().hex() for k, tx in cls.cache.items()}
s = json.dumps(to_dump, sort_keys=True, indent=4)
f.write(s)

# tag::source1[]
class Tx:

def __init__(self, version, tx_ins, tx_outs, locktime, testnet=False):
self.version = version
self.tx_ins = tx_ins  # <1>
self.tx_outs = tx_outs
self.locktime = locktime
self.testnet = testnet  # <2>

def __repr__(self):
tx_ins = ''
for tx_in in self.tx_ins:
tx_ins += tx_in.__repr__() + 'n'
tx_outs = ''
for tx_out in self.tx_outs:
tx_outs += tx_out.__repr__() + 'n'
return 'tx: {}nversion: {}ntx_ins:n{}tx_outs:n{}locktime: {}'.format(
self.id(),
self.version,
tx_ins,
tx_outs,
self.locktime,
)

def id(self):  # <3>
return self.hash().hex()

def hash(self):  # <4>
'''Binary hash of the legacy serialization'''
return hash256(self.serialize())[::-1]
# end::source1[]

@classmethod
def parse(cls, s, testnet=False):
'''Takes a byte stream and parses the transaction at the start
return a Tx object
'''
inputs = []
for _ in range(num_inputs):
inputs.append(TxIn.parse(s))
outputs = []
for _ in range(num_outputs):
outputs.append(TxOut.parse(s))
return cls(version, inputs, outputs, locktime, testnet=testnet)

# s.read(n) will return n bytes
# version is an integer in 4 bytes, little-endian
# num_inputs is a varint, use read_varint(s)
# parse num_inputs number of TxIns
# num_outputs is a varint, use read_varint(s)
# parse num_outputs number of TxOuts
# locktime is an integer in 4 bytes, little-endian
# return an instance of the class (see __init__ for args)
# raise NotImplementedError

# tag::source6[]
def serialize(self):
'''Returns the byte serialization of the transaction'''
result = int_to_little_endian(self.version, 4)
result += encode_varint(len(self.tx_ins))
for tx_in in self.tx_ins:
result += tx_in.serialize()
result += encode_varint(len(self.tx_outs))
for tx_out in self.tx_outs:
result += tx_out.serialize()
result += int_to_little_endian(self.locktime, 4)
return result
# end::source6[]

def fee(self):
'''Returns the fee of this transaction in satoshi'''
# initialize input sum and output sum
# use TxIn.value() to sum up the input amounts
# use TxOut.amount to sum up the output amounts
# fee is input sum - output sum
input_sum, output_sum = 0, 0
for tx_in in self.tx_ins:
input_sum += tx_in.value(testnet=testnet)
for tx_out in self.tx_outs:
output_sum += tx_out.amount
return input_sum - output_sum

# tag::source2[]
class TxIn:
def __init__(self, prev_tx, prev_index, script_sig=None, sequence=0xffffffff):
self.prev_tx = prev_tx
self.prev_index = prev_index
if script_sig is None:  # <1>
self.script_sig = Script()
else:
self.script_sig = script_sig
self.sequence = sequence

def __repr__(self):
return '{}:{}'.format(
self.prev_tx.hex(),
self.prev_index,
)
# end::source2[]

@classmethod
def parse(cls, s):
'''Takes a byte stream and parses the tx_input at the start
return a TxIn object
'''
script_sig = Script.parse(s)
return cls(prev_tx, prev_index, script_sig, sequence)

# prev_tx is 32 bytes, little endian
# prev_index is an integer in 4 bytes, little endian
# use Script.parse to get the ScriptSig
# sequence is an integer in 4 bytes, little-endian
# return an instance of the class (see __init__ for args)
# raise NotImplementedError

# tag::source5[]
def serialize(self):
'''Returns the byte serialization of the transaction input'''
result = self.prev_tx[::-1]
result += int_to_little_endian(self.prev_index, 4)
result += self.script_sig.serialize()
result += int_to_little_endian(self.sequence, 4)
return result
# end::source5[]

# tag::source8[]
def fetch_tx(self, testnet=False):
return TxFetcher.fetch(self.prev_tx.hex(), testnet=testnet)

def value(self, testnet=False):
'''Get the output value by looking up the tx hash.
Returns the amount in satoshi.
'''
tx = self.fetch_tx(testnet=testnet)
return tx.tx_outs[self.prev_index].amount

def script_pubkey(self, testnet=False):
'''Get the ScriptPubKey by looking up the tx hash.
Returns a Script object.
'''
tx = self.fetch_tx(testnet=testnet)
return tx.tx_outs[self.prev_index].script_pubkey
# end::source8[]

# tag::source3[]
class TxOut:

def __init__(self, amount, script_pubkey):
self.amount = amount
self.script_pubkey = script_pubkey

def __repr__(self):
return '{}:{}'.format(self.amount, self.script_pubkey)
# end::source3[]

@classmethod
def parse(cls, s):
'''Takes a byte stream and parses the tx_output at the start
return a TxOut object
'''
script_pubkey = Script.parse(s)
return cls(amount, script_pubkey)
# amount is an integer in 8 bytes, little endian
# use Script.parse to get the ScriptPubKey
# return an instance of the class (see __init__ for args)
raise NotImplementedError

# tag::source4[]
def serialize(self):  # <1>
'''Returns the byte serialization of the transaction output'''
result = int_to_little_endian(self.amount, 8)
result += self.script_pubkey.serialize()
return result
# end::source4[]

``````

THE END

)">