# 挑战杯 python区块链实现 – proof of work工作量证明共识算法

## 0 前言

🔥 优质竞赛项目系列，今天要分享的是

python区块链实现 - proof of work工作量证明共识算法

🧿 更多资料, 项目分享：

## 1 区块链基础

### 1.1 比特币内部结构

• previous hash(前一个区块的hash)
• merkle root(默克尔树根节点,内部存储交易数据)
• timestamp(当前区块生成的时间)
• nonce(旷工计算hash值次数)

### 1.2 实现的区块链数据结构

• index 当前第几个区块
• timestamp 该区块创建时的时间戳
• data 交易信息
• previousHash 前一个区块的hash
• hash 当前区块的hash

### 1.3 注意点

``````

from hashlib import sha256
//区块schema
class Block:

def __init__(self,index,timestamp,data,previousHash=""):

self.index = index
self.timestamp = timestamp
self.data = data
self.previousHash = previousHash
self.hash = self.calculateHash()

//计算当前区块的hash值
def calculateHash(self):
plainData = str(self.index)+str(self.timestamp)+str(self.data)
return sha256(plainData.encode('utf-8')).hexdigest()

def __str__(self):
return str(self.__dict__)
//区块链schema
class BlockChain:
//初始化的时候 创建 创世区块
def __init__(self):
self.chain = [self.createGenesisBlock()]
//构建创世区块
def createGenesisBlock(self):
return Block(0,"01/01/2018","genesis block","0")
//获取最后一个区块
def getLatestBlock(self):
return self.chain[len(self.chain)-1]
//往区块链里面添加区块
newBlock.previousHash = self.getLatestBlock().hash
newBlock.hash = newBlock.calculateHash()
self.chain.append(newBlock)

def __str__(self):
return str(self.__dict__)
//校验区块链是不是有效的 有没有人被篡改
def chainIsValid(self):
for index in range(1,len(self.chain)):
currentBlock = self.chain[index]
previousBlock = self.chain[index-1]
if (currentBlock.hash != currentBlock.calculateHash()):
return False
if previousBlock.hash != currentBlock.previousHash:
return False
return True

myCoin = BlockChain()

#print block info 打印区块链信息
print("print block info ####:")
for block in myCoin.chain:
print(block)
#check blockchain is valid 检查区块链是不是有效的
print("before tamper block,blockchain is valid ###")
print(myCoin.chainIsValid())
#tamper the blockinfo  篡改区块2的数据
myCoin.chain[1].data = "{amount:1002}"
print("after tamper block,blockchain is valid ###")
print(myCoin.chainIsValid())

``````

``````print block info ####:
{'index': 0, 'timestamp': '01/01/2018', 'data': 'genesis block', 'previousHash': '0', 'hash': 'd8d21e5ba33780d5eb77d09d3b407ceb8ade4e5545ef951de1997b209d91e264'}
{'index': 1, 'timestamp': '02/01/2018', 'data': '{amount:4}', 'previousHash': 'd8d21e5ba33780d5eb77d09d3b407ceb8ade4e5545ef951de1997b209d91e264', 'hash': '15426e32db30f4b26aa719ba5e573f372f41e27e4728eb9e9ab0bea8eae63a9d'}
{'index': 2, 'timestamp': '03/01/2018', 'data': '{amount:5}', 'previousHash': '15426e32db30f4b26aa719ba5e573f372f41e27e4728eb9e9ab0bea8eae63a9d', 'hash': '75119e897f21c769acee6e32abcefc5e88e250a1f35cc95946379436050ac2f0'}
before tamper block,blockchain is valid ###
True
after tamper block,blockchain is valid ###
False
``````

### 1.4 区块链的核心-工作量证明算法

work),在介绍pow之前先思考一下为什么要工作量证明算法,或者再往前想一步为什么比特币如何解决信任的问题?

#### 1.4.3 代码实现

``````

from hashlib import sha256
import time
class Block:

def __init__(self,index,timestamp,data,previousHash=""):

self.index = index
self.timestamp = timestamp
self.data = data
self.previousHash = previousHash
self.nonce = 0 //代表当前计算了多少次hash计算
self.hash = self.calculateHash()

def calculateHash(self):
plainData = str(self.index)+str(self.timestamp)+str(self.data)+str(self.nonce)
return sha256(plainData.encode('utf-8')).hexdigest()
#挖矿 difficulty代表复杂度 表示前difficulty位都为0才算成功
def minerBlock(self,difficulty):
while(self.hash[0:difficulty]!=str(0).zfill(difficulty)):
self.nonce+=1
self.hash = self.calculateHash()

def __str__(self):
return str(self.__dict__)

class BlockChain:

def __init__(self):
self.chain = [self.createGenesisBlock()]
self.difficulty = 5

def createGenesisBlock(self):
return Block(0,"01/01/2018","genesis block")

def getLatestBlock(self):
return self.chain[len(self.chain)-1]
#添加区块前需要 做一道计算题😶,坐完后才能把区块加入到链上
newBlock.previousHash = self.getLatestBlock().hash
newBlock.minerBlock(self.difficulty)
self.chain.append(newBlock)

def __str__(self):
return str(self.__dict__)

def chainIsValid(self):
for index in range(1,len(self.chain)):
currentBlock = self.chain[index]
previousBlock = self.chain[index-1]
if (currentBlock.hash != currentBlock.calculateHash()):
return False
if previousBlock.hash != currentBlock.previousHash:
return False
return True

myCoin = BlockChain()

# 下面打印了每个区块挖掘需要的时间 比特币通过一定的机制控制在10分钟出一个块
# 其实就是根据当前网络算力 调整我们上面difficulty值的大小,如果你在
# 本地把上面代码difficulty的值调很大你可以看到很久都不会出计算结果
startMinerFirstBlockTime = time.time()
print("start to miner first block time :"+str(startMinerFirstBlockTime))

print("miner first block time completed" + ",used " +str(time.time()-startMinerFirstBlockTime) +"s")

startMinerSecondBlockTime = time.time()

print("start to miner first block time :"+str(startMinerSecondBlockTime))

print("miner second block time completed" + ",used " +str(time.time()-startMinerSecondBlockTime) +"sn")

#print block info
print("print block info ####:n")
for block in myCoin.chain:
print("n")
print(block)

#check blockchain is valid
print("before tamper block,blockchain is valid ###")
print(myCoin.chainIsValid())

#tamper the blockinfo
myCoin.chain[1].data = "{amount:1002}"
print("after tamper block,blockchain is valid ###")
print(myCoin.chainIsValid())

``````

## 2 快速实现一个区块链

### 2.3 什么是挖矿

1、计算工作量证明poW
2、通过新增一个交易赋予矿工（自已）一个币
3、构造新区块并将其添加到链中

### 2.5 实现代码

``````

import hashlib
import json
import requests
from textwrap import dedent
from time import time
from uuid import uuid4
from urllib.parse import urlparse

class Blockchain(object):
def __init__(self):
...
self.nodes = set()
# 用 set 来储存节点，避免重复添加节点.
...
self.chain = []
self.current_transactions = []

#创建创世区块
self.new_block(previous_hash=1,proof=100)

"""
在节点列表中添加一个新节点
:return:
"""

def valid_chain(self,chain):
"""
确定一个给定的区块链是否有效
:param chain:
:return:
"""
last_block = chain[0]
current_index = 1

while current_index<len(chain):
block = chain[current_index]
print(f'{last_block}')
print(f'{block}')
print("n______n")
# 检查block的散列是否正确
if block['previous_hash'] != self.hash(last_block):
return False
# 检查工作证明是否正确
if not self.valid_proof(last_block['proof'], block['proof']):
return False

last_block = block
current_index += 1
return True

def ressolve_conflicts(self):
"""
共识算法
:return:
"""
neighbours = self.nodes
new_chain = None
# 寻找最长链条
max_length = len(self.chain)

# 获取并验证网络中的所有节点的链
for node in neighbours:
response = requests.get(f'http://{node}/chain')

if response.status_code == 200:
length = response.json()['length']
chain = response.json()['chain']

# 检查长度是否长，链是否有效
if length > max_length and self.valid_chain(chain):
max_length = length
new_chain = chain

# 如果发现一个新的有效链比当前的长，就替换当前的链
if new_chain:
self.chain = new_chain
return True
return False

def new_block(self,proof,previous_hash=None):
"""
创建一个新的块并将其添加到链中
:param proof: 由工作证明算法生成证明
:param previous_hash: 前一个区块的hash值
:return: 新区块
"""
block = {
'index':len(self.chain)+1,
'timestamp':time(),
'transactions':self.current_transactions,
'proof':proof,
'previous_hash':previous_hash or self.hash(self.chain[-1]),
}

# 重置当前交易记录
self.current_transactions = []

self.chain.append(block)
return block

def new_transaction(self,sender,recipient,amount):
# 将新事务添加到事务列表中
"""
Creates a new transaction to go into the next mined Block
:param sender:发送方的地址
:param recipient:收信人地址
:param amount:数量
:return:保存该事务的块的索引
"""
self.current_transactions.append({
'sender':sender,
'recipient':recipient,
'amount':amount,
})

return  self.last_block['index'] + 1

@staticmethod
def hash(block):
"""
给一个区块生成 SHA-256 值
:param block:
:return:
"""
# 必须确保这个字典（区块）是经过排序的，否则将会得到不一致的散列
block_string = json.dumps(block,sort_keys=True).encode()
return hashlib.sha256(block_string).hexdigest()

@property
def last_block(self):
# 返回链中的最后一个块
return self.chain[-1]

def proof_of_work(self,last_proof):
# 工作算法的简单证明
proof = 0
while self.valid_proof(last_proof,proof)is False:
proof +=1
return proof

@staticmethod
def valid_proof(last_proof,proof):
# 验证证明
guess =  f'{last_proof}{proof}'.encode()
guess_hash = hashlib.sha256(guess).hexdigest()
return guess_hash[:4] =="0000"

# 实例化节点

# 为该节点生成一个全局惟一的地址
node_identifier = str(uuid4()).replace('-','')

# 实例化Blockchain类
blockchain = Blockchain()

# 进行挖矿请求
@app.route('/mine',methods=['GET'])
def mine():
# 运行工作算法的证明来获得下一个证明。
last_block = blockchain.last_block
last_proof = last_block['proof']
proof = blockchain.proof_of_work(last_proof)

# 必须得到一份寻找证据的奖赏。
blockchain.new_transaction(
sender="0",
recipient=node_identifier,
amount=1,
)

# 通过将其添加到链中来构建新的块
previous_hash = blockchain.hash(last_block)
block = blockchain.new_block(proof,previous_hash)
response = {
'message': "New Block Forged",
'index': block['index'],
'transactions': block['transactions'],
'proof': block['proof'],
'previous_hash': block['previous_hash'],
}
return jsonify(response), 200

# 创建交易请求
@app.route('/transactions/new',methods=['POST'])
def new_transactions():
values = request.get_json()

# 检查所需要的字段是否位于POST的data中
required = ['seder','recipient','amount']
if not all(k in values for k in request):
return 'Missing values',400

#创建一个新的事物
index = blockchain.new_transaction(values['sender'], values['recipient'], values['amount'])
response = {'message': f'Transaction will be added to Block {index}'}
return jsonify(response), 201

# 获取所有快信息
@app.route('/chain',methods=['GET'])
def full_chain():
response = {
'chain':blockchain.chain,
'length':len(blockchain.chain),
}
return jsonify(response),200

# 添加节点
@app.route('/nodes/register',methods=['POST'])
def  register_nodes():
values = request.get_json()
nodes = values.get('nodes')
if nodes is None:
return "Error: Please supply a valid list of nodes", 400

for node in nodes:
blockchain.register_node(node)

response = {
'message': 'New nodes have been added',
'total_nodes': list(blockchain.nodes),
}
return jsonify(response), 201

# 解决冲突
@app.route('/nodes/resolve', methods=['GET'])
def consensus():
replaced = blockchain.resolve_conflicts()

if replaced:
response = {
'message': 'Our chain was replaced',
'new_chain': blockchain.chain
}
else:
response = {
'message': 'Our chain is authoritative',
'chain': blockchain.chain
}

return jsonify(response), 200

if __name__ == '__main__':
app.run(host='0.0.0.0',port=5000)

``````

🧿 更多资料, 项目分享：