学习用Python实现简单区块链

Python 实现区块链

环境

python3(本次用的3.8)、postman、requests、Flask,pip,pipenv等工具

环境步骤

  1. 先安装一个环境
    pip install pipenv
    pipenv使用

  2. 创建环境
    pipenv install 会生成一个pipfile文件,用于管理库的依赖

  3. 在虚拟环境中安装依赖
    pipenv install flask==2.0.2
    pipenv install requests==2.18.4
    安装成功后可看到pipfile中看到
    在这里插入图片描述

  4. 启动虚拟环境
    pipenv shell

  5. 新建一个blockchain.py 开始撸代码

代码思路

确定区块结构

{
    "index":0, // 块序号
    "timestamp":"",// 时间戳
    "transactions":""[ // 交易信息
        {
            "sender":"",
            "recipient":"",
            "amount":5,
        }
    ],
    "proof":"", // 工作量证明
    "previous_hash":"",// 前区块的哈希值
}

确定一下区块类功能

class Blockchain:
    def __init__(self):
        self.chain = [] # 存块
        self.current_transactions = [] # 交易实体

    def new_block(self): # 新建区块
        pass

    def new_transaction(self):# 新建交易
        pass

    @staticmethod
    def hash(block):# 计算哈希值
        pass

    @property
    def last_block(self):# 获取当前链中最后一个区块
        pass
    
    def proof_of_work(self):# 工作量证明计算
    	pass
   
	def vaild_proof(self):# 验证计算值是否符合要求
		pass

	def vaild_chain(self):# 验证链是否符合要求
		pass

	def register_node(self):# 节点注册
		pass 

	def resolve_conflicts:# 共识算法,解决冲突
		pass

添加交易

def new_transaction(self, sender: str, recipient: str, amount: int) -> int:
        """添加新的交易

        Args:
            sender (str): 发送方
            recipient (str): 接收方
            amount (int): 金额

        Returns:
            int: 返回一个包含此交易的区块序号
        """
        self.current_transactions.append({
            'sender': sender,
            'recipient': recipient,
            'amount': amount
        })
        return self.last_block['index'] + 1

添加新块

 def new_block(self, proof: int, previous_hash=None):  # 新建区块
        block = {
            'index': len(self.chain) + 1,
            'timestamp': time(),
            'transactions': self.current_transactions,
            'proof': proof,
            'previous_hash': previous_hash or self.hash(self.last_block),
        }
        self.current_transactions = []  # 新建区块打包后重置当前交易信息
        self.chain.append(block)  # 把新建的区块加入链
        return block

计算哈希值

@staticmethod
    def hash(block: Dict[str, Any]) -> str:
        """计算哈希值,返回哈希后的摘要信息

        Args:
            block (Dict[str, Any]): 传入一个块

        Returns:
            str: 摘要信息
        """
        block_string = json.dumps(block, sort_keys=True).encode()
        return hashlib.sha256(block_string).hexdigest()

工作量证明与验证

def proof_of_work(self, last_proof: int) -> int:
        """工作量计算,计算一个符合要求的哈希值

        Args:
            last_proof (int): 上一个块的工作量随机数

        Returns:
            int: 返回符合要求的工作量随机数
        """
        proof = 0
        while self.valid_proof(last_proof, proof) is False:
            proof += 1
        # print(proof) 输出计算结果
        return proof

def valid_proof(self, last_proof: int, proof: int) -> bool:
        """工作量证明验证,验证计算结果是否以2个0开头

        Args:
            last_proof (int): 前工作证明
            proof (int): 当前工作证明

        Returns:
            bool: 返回验证是否有效
        """
        guess = f'{last_proof}{proof}'.encode()
        guess_hash = hashlib.sha256(guess).hexdigest()
        # print(guess_hash) 输出计算过程
        if guess_hash[0:2] == "00":
            return True
        else:
            return False

验证一下,创建一个类,设定前一个工作量证明为100
在这里插入图片描述
尝试运行代码
在这里插入图片描述
可以看到这里算出前两位为0的就停止,结果为226
在这里插入图片描述

节点注册

def register_node(self, address: str) -> None:
        """添加一个新节点到节点集中

        Args:
            address (str): 节点的地址。Eg:"http://127.0.0.1:5002"
        """
        parsed_url = urlparse(address)  # 解析url参数
        self.nodes.add(parsed_url.netloc)  # 获取域名服务器

共识算法

def resolve_conflicts(self) -> bool:
        """共识算法,解决冲突,以最长且有效的链为主

        Returns:
            bool: 冲突是否解决成功
        """
        neighbours = self.nodes  # 获取节点信息
        new_chain = None  # 定义可能的新链

        max_length = len(self.chain)  # 获取当前链长度

        for node in neighbours:  # 获取节点的链条信息,如果更长且有效则直接替换
            response = requests.get(f'http://{node}/chain')

            if response.status_code == 200:
                length = response.json()['length']
                chain = response.json()['chain']

                if length > max_length and self.vaild_chain(chain):
                    max_length = length
                    new_chain = chain

        if new_chain:
            self.chain = new_chain
            return True
        return False

Web功能

使用flask 部署服务器

app = Flask(__name__)

if __name__ == "__main__":
    app.run(host='0.0.0.0', port=5000)

尝试运行代码
在这里插入图片描述
从postman 中可看到一些信息,目前没有定义接口,自然是404
在这里插入图片描述

确定一下接口

@app.route('/transactions/new', methods=['POST'])
def new_transaction():
    pass

@app.route('/mine', methods=['GET'])
def mine():
    pass

@app.route('/chain', methods=['GET'])
def full_chain():
    pass

@app.route('/nodes/register', methods=['POST'])
def register_nodes():
	pass

@app.route('/nodes/resolve', methods=['GET'])
def consensus():
	pass

交易接口

# 添加新建交易接口
@app.route('/transactions/new', methods=['POST'])
def new_transaction():
    values = request.get_json()
    required = ['sender', 'recipient', 'amount']
    if not all(k in values for k in required):
        return '确少参数', 400

    index = blockchain.new_transaction(values['sender'], values['recipient'],
                                       values['amount'])

    response = {'message': f'交易将会被添加到块 {index}'}
    return jsonify(response), 201

查看链接口

# 查看链接口
@app.route('/chain', methods=['GET'])
def full_chain():
    response = {
        'chain': blockchain.chain,
        'length': len(blockchain.chain),
    }
    return jsonify(response), 200

打包区块接口

@app.route('/mine', methods=['GET'])
def mine():
    last_block = blockchain.last_block  # 获取链上最后一个区块的信息
    last_proof = last_block['proof']
    proof = blockchain.proof_of_work(last_proof)

    # 发送者为 "0" 表明是新挖出的币,为矿工提供奖励
    blockchain.new_transaction(
        sender="0",
        recipient=node_identifier,
        amount=1,
    )

    block = blockchain.new_block(proof, None)  # 生成一个新块

    response = {
        'message': "打包成功,新区块已生成!",
        'index': block['index'],
        'transactions': block['transactions'],
        'proof': block['proof'],
        'previous_hash': block['previous_hash'],
    }
    return jsonify(response), 200

去postman 依次请求chain(查看当前链)、transactions/new(新建交易,注意是post方法提交的json数据)、mine(打包区块)、chain
在这里插入图片描述

在这里插入图片描述
这时,一个单节点的区块链流程基本实现。

节点注册接口

# 节点注册接口
@app.route('/nodes/register', methods=['POST'])
def register_nodes():
    values = request.get_json()

    nodes = values.get('nodes')
    if nodes is None:
        return "Error: 请提供一个符合规则的节点", 400

    for node in nodes:
        blockchain.register_node(node)

    response = {
        'message': '新节点已经被添加!',
        'total_nodes': list(blockchain.nodes),
    }
    return jsonify(response), 201

测试一下
在这里插入图片描述

共识接口

#  共识接口
@app.route('/nodes/resolve', methods=['GET'])
def consensus():
    replaced = blockchain.resolve_conflicts()

    if replaced:
        response = {'message': '当前链不符合要求,已被替换', 'new_chain': blockchain.chain}
    else:
        response = {'message': '当前链符合要求', 'chain': blockchain.chain}

    return jsonify(response), 200

完整运行

  • 打开两个终端构造两个节点5000 和 5001
    在这里插入图片描述
  • 分别查看5000节点 和 5001节点 的当前链
    5000节点
    在这里插入图片描述
    5001节点
    在这里插入图片描述
    均只有一个创世区块
  • 接着对5000节点,建立新的交易,并封装区块,再次查看链,此时应该有链上应该有两个区块,而5001节点没有操作,所以链中只有一个块
    在这里插入图片描述
  • 此时将5001节点注册到5000节点中,5000节点注册到5001节点中,由于共识机制,选取最长的链,所以此时查看5001节点的链时,会出现两个节点而不是一个。
    在这里插入图片描述
    至此,简易区块链实现结束。

完整代码

import hashlib
import json
from time import time
from urllib.parse import urlparse  # url解析
from uuid import uuid4  # 生成唯一id
from flask import Flask, jsonify, request
from typing import Any, Dict, List
import requests
from argparse import ArgumentParser  # 命令行参数解析


class Blockchain:

    def __init__(self):
        self.chain = []  # 存块
        self.current_transactions = []  # 交易实体
        self.nodes = set()  # 无重复的节点集合

        # 创建创世区块
        self.new_block(previous_hash='1', proof=100)

    def register_node(self, address: str) -> None:
        """添加一个新节点到节点集中

        Args:
            address (str): 节点的地址。Eg:"http://127.0.0.1:5002"
        """
        parsed_url = urlparse(address)  # 解析url参数
        self.nodes.add(parsed_url.netloc)  # 获取域名服务器

    def new_block(self, proof: int, previous_hash=None):  # 新建区块
        block = {
            'index': len(self.chain) + 1,
            'timestamp': time(),
            'transactions': self.current_transactions,
            'proof': proof,
            'previous_hash': previous_hash or self.hash(self.last_block),
        }
        self.current_transactions = []  # 新建区块打包后重置当前交易信息
        self.chain.append(block)  # 把新建的区块加入链
        return block

    def new_transaction(self, sender: str, recipient: str, amount: int) -> int:
        """添加新的交易

        Args:
            sender (str): 发送方
            recipient (str): 接收方
            amount (int): 金额

        Returns:
            int: 返回一个包含此交易的区块序号
        """
        self.current_transactions.append({
            'sender': sender,
            'recipient': recipient,
            'amount': amount
        })
        return self.last_block['index'] + 1

    @staticmethod
    def hash(block: Dict[str, Any]) -> str:
        """计算哈希值,返回哈希后的摘要信息

        Args:
            block (Dict[str, Any]): 传入一个块

        Returns:
            str: 摘要信息
        """
        block_string = json.dumps(block, sort_keys=True).encode()
        return hashlib.sha256(block_string).hexdigest()

    @property
    def last_block(self) -> Dict[str, Any]:  # 获取当前链中最后一个区块
        return self.chain[-1]

    def proof_of_work(self, last_proof: int) -> int:
        """工作量计算,计算一个符合要求的哈希值

        Args:
            last_proof (int): 上一个块的工作量随机数

        Returns:
            int: 返回符合要求的工作量随机数
        """
        proof = 0
        while self.valid_proof(last_proof, proof) is False:
            proof += 1
        # print(proof) 输出计算结果
        return proof

    def vaild_chain(self, chain: List[Dict[str, Any]]) -> bool:
        """验证链是否合理:最长且有效

        Args:
            chain (List[Dict[str, Any]]): 传入链

        Returns:
            bool: 返回是否有效
        """
        last_block = chain[0]  # 从第一个创世区块开始遍历验证
        current_index = 1

        while current_index < len(chain):
            block = chain[current_index]
            print(f'{last_block}')
            print(f'{block}')
            print("n-----------n")
            # 如果当前区块的前哈希和前一个计算出来的哈希值不同则是无效链
            if block['previous_hash'] != self.hash(last_block):
                return False

            # 检验工作量证明是否符合要求
            if not self.valid_proof(last_block['proof'], block['proof']):
                return False

            last_block = block
            current_index += 1

        return True

    def valid_proof(self, last_proof: int, proof: int) -> bool:
        """工作量证明验证,验证计算结果是否以2个0开头

        Args:
            last_proof (int): 前工作证明
            proof (int): 当前工作证明

        Returns:
            bool: 返回验证是否有效
        """
        guess = f'{last_proof}{proof}'.encode()
        guess_hash = hashlib.sha256(guess).hexdigest()
        # print(guess_hash) 输出计算过程
        if guess_hash[0:2] == "00":
            return True
        else:
            return False

    def resolve_conflicts(self) -> bool:
        """共识算法,解决冲突,以最长且有效的链为主

        Returns:
            bool: 冲突是否解决成功
        """
        neighbours = self.nodes  # 获取节点信息
        new_chain = None  # 定义可能的新链

        max_length = len(self.chain)  # 获取当前链长度

        for node in neighbours:  # 获取节点的链条信息,如果更长且有效则直接替换
            response = requests.get(f'http://{node}/chain')

            if response.status_code == 200:
                length = response.json()['length']
                chain = response.json()['chain']

                if length > max_length and self.vaild_chain(chain):
                    max_length = length
                    new_chain = chain

        if new_chain:
            self.chain = new_chain
            return True
        return False


app = Flask(__name__)  # flask框架
node_identifier = str(uuid4()).replace('-', '')  # 使者获取一个唯一的uid
blockchain = Blockchain()


# 添加新建交易接口
@app.route('/transactions/new', methods=['POST'])
def new_transaction():
    values = request.get_json()
    required = ['sender', 'recipient', 'amount']
    if not all(k in values for k in required):
        return '确少参数', 400

    index = blockchain.new_transaction(values['sender'], values['recipient'],
                                       values['amount'])

    response = {'message': f'交易将会被添加到块 {index}'}
    return jsonify(response), 201


# 添加新建打包区块接口
@app.route('/mine', methods=['GET'])
def mine():
    last_block = blockchain.last_block  # 获取链上最后一个区块的信息
    last_proof = last_block['proof']
    proof = blockchain.proof_of_work(last_proof)

    # 发送者为 "0" 表明是新挖出的币,为矿工提供奖励
    blockchain.new_transaction(
        sender="0",
        recipient=node_identifier,
        amount=1,
    )

    block = blockchain.new_block(proof, None)  # 生成一个新块

    response = {
        'message': "打包成功,新区块已生成!",
        'index': block['index'],
        'transactions': block['transactions'],
        'proof': block['proof'],
        'previous_hash': block['previous_hash'],
    }
    return jsonify(response), 200


# 查看链接口
@app.route('/chain', methods=['GET'])
def full_chain():
    response = {
        'chain': blockchain.chain,
        'length': len(blockchain.chain),
    }
    return jsonify(response), 200


# 节点注册接口
@app.route('/nodes/register', methods=['POST'])
def register_nodes():
    values = request.get_json()

    nodes = values.get('nodes')
    if nodes is None:
        return "Error: 请提供一个符合规则的节点", 400

    for node in nodes:
        blockchain.register_node(node)

    response = {
        'message': '新节点已经被添加!',
        'total_nodes': list(blockchain.nodes),
    }
    return jsonify(response), 201


#  共识接口
@app.route('/nodes/resolve', methods=['GET'])
def consensus():
    replaced = blockchain.resolve_conflicts()

    if replaced:
        response = {'message': '当前链不符合要求,已被替换', 'new_chain': blockchain.chain}
    else:
        response = {'message': '当前链符合要求', 'chain': blockchain.chain}

    return jsonify(response), 200


if __name__ == "__main__":

    parser = ArgumentParser()  # 命令行参数解析,端口默认5000
    parser.add_argument('-p',
                        '--port',
                        default=5000,
                        type=int,
                        help='port to listen on')
    args = parser.parse_args()
    port = args.port

    app.run(host='127.0.0.1', port=port)  # 启动web服务,默认本机


本图文内容来源于网友网络收集整理提供,作为学习参考使用,版权属于原作者。
THE END
分享
二维码

)">
< <上一篇
下一篇>>