openblockchain是IBM开源的blockchain项目,具体安装流程之前已经介绍过,具体请看http://blog.csdn.net/pangjiuzala/article/details/50897819。
解压后会发现在obc-peer根目录下出现一个main.go文件,其中主要功能是生成obc-peer命令,核心代码集中在openchain中的。接下来,将首先从chaincode代码分析开始,包含了如图下图所示的几个核心文件。
chaincode.go
chaincode.go位于shim包中,
var chaincodeLogger = logging.MustGetLogger("chaincode")
//调用go-logging中logging库的MustGetLogger函数对shim package进行记录,相当于日志文件
//其中传递的参数为当前go文件
MustGetLogger
MustGetLogger位于logger.go文件中,具体代码如下
// GetLogger 创建和返回一个基于模块名称的Logger对象
func GetLogger(module string) (*Logger, error) {
return &Logger{Module: module}, nil
}
// MustGetLogger 与GetLogger相似,但是当logger不能被创建时会出现
//错乱,它简化了安全初始化全局记录器
func MustGetLogger(module string) *Logger {
logger, err := GetLogger(module)
if err != nil {
panic("logger: " + module + ": " + err.Error())
}
return logger
}
定义chaincode接口,它是一个供chaincode开发者需要实现标准回调接口
type Chaincode interface {
// run方法在每一笔交易初始化的时候被调用
Run(stub *ChaincodeStub, function string, args []string) ([]byte, error)
// Query函数以只读方式查询chaincode状态
Query(stub *ChaincodeStub, function string, args []string) ([]byte, error)
}
Start(cc Chaincode)
启动chaincode引导程序的入口节点
func Start(cc Chaincode) error {
viper.SetEnvPrefix("OPENCHAIN")
viper.AutomaticEnv()
replacer := strings.NewReplacer(".", "_")
//替换前缀为openchain的文件中的.为_
viper.SetEnvKeyReplacer(replacer)
flag.StringVar(&peerAddress, "peer.address", "", "peer address")
flag.Parse()
chaincodeLogger.Debug("Peer address: %s", getPeerAddress())
// 使用同步检验建立与client的连接
clientConn, err := newPeerClientConnection()
if err != nil {
chaincodeLogger.Error(fmt.Sprintf("Error trying to connect to local peer: %s", err))
return fmt.Errorf("Error trying to connect to local peer: %s", err)
}
chaincodeLogger.Debug("os.Args returns: %s", os.Args)
chaincodeSupportClient := pb.NewChaincodeSupportClient(clientConn)
err = chatWithPeer(chaincodeSupportClient, cc)
//启动chiancodeSuppportClient
return err
}
getPeerAddress()
获取peer地址
func getPeerAddress() string {
if peerAddress != "" {
return peerAddress
}
if peerAddress = viper.GetString("peer.address"); peerAddress == "" {
//假如被docker容器包含,返回主机地址
peerAddress = "172.17.42.1:30303"
}
return peerAddress
}
newPeerClientConnection()
创建peer 客户端连接
func newPeerClientConnection() (*grpc.ClientConn, error) {
//调用google.golang.org中grpc的ClinetConn方法
var opts []grpc.DialOption
if viper.GetBool("peer.tls.enabled") {
//viper来源于github.com/spf13/viper,一个应用程序的配置系统
var sn string
if viper.GetString("peer.tls.server-host-override") != "" {
sn = viper.GetString("peer.tls.server-host-override")
}
var creds credentials.TransportAuthenticator
//credenetials包实现了各种支持g RPC库调用的凭证
if viper.GetString("peer.tls.cert.file") != "" {
var err error
creds, err = credentials.NewClientTLSFromFile(viper.GetString("peer.tls.cert.file"), sn)
if err != nil {
grpclog.Fatalf("Failed to create TLS credentials %v", err)
}
} else {
creds = credentials.NewClientTLSFromCert(nil, sn)
//NewClientTLSFromCert从客户端输入的证书中构造了TLS
}
//append中grpc调用的方法在grpc根目录下下clientconn.go文件中定义方//法
opts = append(opts, grpc.WithTransportCredentials(creds))
}
///WithTransportCredentials返回配置了一个连接级别的安全凭据(例//如,TLS/ SSL)的拨号操作。
opts = append(opts, grpc.WithTimeout(1*time.Second))
//WithTimeout返回配置客户端拨号超时连接的拨号操作
opts = append(opts, grpc.WithBlock())
//WithBlock返回一个拨号选项,它将持续调用一个拨号块直到底层的连接已经建立。没有这一点,在后台发生将会立即返回拨打和服务器连接。
opts = append(opts, grpc.WithInsecure())
conn, err := grpc.Dial(getPeerAddress(), opts...)
if err != nil {
return nil, err
}
return conn, err
}
chatWithPeer()
使用peer进行通信
func chatWithPeer(chaincodeSupportClient pb.ChaincodeSupportClient, cc Chaincode) error {
// 通过peer验证创建流
stream, err := chaincodeSupportClient.Register(context.Background())
if err != nil {
return fmt.Errorf("Error chatting with leader at address=%s: %s", getPeerAddress(), err)
}
//创建传递给链码的链码存根
//stub := &ChaincodeStub{}
// Create the shim handler responsible for all control logic
handler = newChaincodeHandler(getPeerAddress(), stream, cc)
defer stream.CloseSend()
// Send the ChaincodeID during register.
chaincodeID := &pb.ChaincodeID{Name: viper.GetString("chaincode.id.name")}
payload, err := proto.Marshal(chaincodeID)
if err != nil {
return fmt.Errorf("Error marshalling chaincodeID during chaincode registration: %s", err)
}
// 流寄存器
chaincodeLogger.Debug("Registering.. sending %s", pb.ChaincodeMessage_REGISTER)
handler.serialSend(&pb.ChaincodeMessage{Type: pb.ChaincodeMessage_REGISTER, Payload: payload})
waitc := make(chan struct{})
go func() {
defer close(waitc)
msgAvail := make(chan *pb.ChaincodeMessage)
var nsInfo *nextStateInfo
var in *pb.ChaincodeMessage
recv := true
for {
in = nil
err = nil
nsInfo = nil
if recv {
recv = false
go func() {
var in2 *pb.ChaincodeMessage
in2, err = stream.Recv()
msgAvail <- in2
}()
}
select {
case in = <-msgAvail:
if err == io.EOF {
chaincodeLogger.Debug("Received EOF, ending chaincode stream, %s", err)
return
} else if err != nil {
chaincodeLogger.Error(fmt.Sprintf("Received error from server: %s, ending chaincode stream", err))
return
} else if in == nil {
err = fmt.Errorf("Received nil message, ending chaincode stream")
chaincodeLogger.Debug("Received nil message, ending chaincode stream")
return
}
chaincodeLogger.Debug("[%s]Received message %s from shim", shortuuid(in.Uuid), in.Type.String())
recv = true
case nsInfo = <-handler.nextState:
in = nsInfo.msg
if in == nil {
panic("nil msg")
}
chaincodeLogger.Debug("[%s]Move state message %s", shortuuid(in.Uuid), in.Type.String())
}
// 调用 FSM.handleMessage(),fsm即状态机
err = handler.handleMessage(in)
if err != nil {
err = fmt.Errorf("Error handling message: %s", err)
return
}
if nsInfo != nil && nsInfo.sendToCC {
chaincodeLogger.Debug("[%s]send state message %s", shortuuid(in.Uuid), in.Type.String())
if err = handler.serialSend(in); err != nil {
err = fmt.Errorf("Error sending %s: %s", in.Type.String(), err)
return
}
}
}
}()
<-waitc
return err
}
GetState
GetState被调用后,将从总帐中获取chaincode状态记录
func (stub *ChaincodeStub) GetState(key string) ([]byte, error) {
return handler.handleGetState(key, stub.UUID)
PutState
相对于GetState,PutState被调用后,将会把chaincode状态记录到总账中
func (stub *ChaincodeStub) PutState(key string, value []byte) error {
return handler.handlePutState(key, value, stub.UUID)
}
DelState
从总账中删除chaincode状态记录
func (stub *ChaincodeStub) DelState(key string) error {
return handler.handleDelState(key, stub.UUID)
}
StateRangeQueryIterator
StateRangeQueryIterator是一个迭代器,遍历一定范围内的以键值对形式记录的状态
type StateRangeQueryIterator struct {
handler *Handler
uuid string
response *pb.RangeQueryStateResponse
currentLoc int
}
StateRangeQueryIterator
chaincode 调用RangeQueryState来查询一定范围内键的状态。假设startKey和endKey按词汇顺序排序,将返回一个迭代器,可用于遍历startKey和endKey之间的所有键,并且迭代器返回顺序是随机的。
func (stub *ChaincodeStub) RangeQueryState(startKey, endKey string) (*StateRangeQueryIterator, error) {
response, err := handler.handleRangeQueryState(startKey, endKey, stub.UUID)
if err != nil {
return nil, err
}
return &StateRangeQueryIterator{handler, stub.UUID, response, 0}, nil
}
HasNext()
如果迭代器的查询范围包含附加键和值,hasnext将返回true
func (iter *StateRangeQueryIterator) HasNext() bool {
if iter.currentLoc < len(iter.response.KeysAndValues) || iter.response.HasMore {
return true
}
return false
}
Next()
Next将返回下一个在迭代器查询范围内的键和值
func (iter *StateRangeQueryIterator) Next() (string, []byte, error) {
if iter.currentLoc < len(iter.response.KeysAndValues) {
keyValue := iter.response.KeysAndValues[iter.currentLoc]
iter.currentLoc++
return keyValue.Key, keyValue.Value, nil
} else if !iter.response.HasMore {
return "", nil, errors.New("No such key")
} else {
response, err := iter.handler.handleRangeQueryStateNext(iter.response.ID, iter.uuid)
//返回迭代器的响应id和uuid
if err != nil {
return "", nil, err
}
iter.currentLoc = 0
iter.response = response
keyValue := iter.response.KeysAndValues[iter.currentLoc]
iter.currentLoc++
return keyValue.Key, keyValue.Value, nil
}
}
Close()
当读取过程完成从迭代器中释放资源后,调用Close函数关闭范围查询迭代器
func (iter *StateRangeQueryIterator) Close() error {
_, err := iter.handler.handleRangeQueryStateClose(iter.response.ID, iter.uuid)
return err
}
InvokeChaincode()
通过一个chaincode调用中执行对另一个chaincode的调用
func (stub *ChaincodeStub) InvokeChaincode(chaincodeName string, function string, args []string) ([]byte, error) {
return handler.handleInvokeChaincode(chaincodeName, function, args, stub.UUID)
}
QueryChaincode
当一个chaincode调用中执行对另一个chaincode的查询操作时调用
QueryChaincode
func (stub *ChaincodeStub) QueryChaincode(chaincodeName string, function string, args []string) ([]byte, error) {
return handler.handleQueryChaincode(chaincodeName, function, args, stub.UUID)
}
GetRows
接下来的方法是对数据库建表以及查询的一些功能函数,这里就不一一赘述,以GetRows方法为例介绍
GetRows返回基于部分key的行,例如 ,下给出下表
| A | B | C | D |
当A,C和D是key的时候,GetRow方法可以被|A,C|调用,来返回所有包含A,C以及其他例如D的值的行。当然,只包含A时也可以获取C以及D的值
func (stub *ChaincodeStub) GetRows(tableName string, key []Column) (<-chan Row, error) {
keyString, err := buildKeyString(tableName, key)
if err != nil {
return nil, err
}
iter, err := stub.RangeQueryState(keyString+"1", keyString+":")
if err != nil {
return nil, fmt.Errorf("Error fetching rows: %s", err)
}
defer iter.Close()
rows := make(chan Row)
go func() {
for iter.HasNext() {
_, rowBytes, err := iter.Next()
if err != nil {
close(rows)
}
var row Row
err = proto.Unmarshal(rowBytes, &row)
//调用proto的Unmarshal函数,实现对数据的散集
if err != nil {
close(rows)
}
rows <- row
}
close(rows)
}()
return rows, nil
}
chaincode.pb.go
chaincode.pb.go是从chaincode.proto文件中产生的,它包含了一些顶端消息
- 列的定义
- 表信息
- 行信息
- 列信息
ColumnDefinition
type ColumnDefinition struct {
Name string `protobuf:"bytes,1,opt,name=name" json:"name,omitempty"`
Type ColumnDefinition_Type `protobuf:"varint,2,opt,name=type,enum=shim.ColumnDefinition_Type" json:"type,omitempty"`
Key bool `protobuf:"varint,3,opt,name=key" json:"key,omitempty"`
}
Table
type Table struct {
Name string `protobuf:"bytes,1,opt,name=name" json:"name,omitempty"`
//protobuf以bytes为单位,opt未操作
//json中传递的是表名,可以忽略空表情况
ColumnDefinitions []*ColumnDefinition `protobuf:"bytes,2,rep,name=columnDefinitions" json:"columnDefinitions,omitempty"`
}
Column
type Column struct {
// Types that are valid to be assigned to Value:
// *Column_String_
// *Column_Int32
// *Column_Int64
// *Column_Uint32
// *Column_Uint64
// *Column_Bytes
// *Column_Bool
Value isColumn_Value `protobuf_oneof:"value"`
}
Row
type Row struct {
Columns []*Column `protobuf:"bytes,1,rep,name=columns" json:"columns,omitempty"`
}
shim包下handler.go
Handler Struct
Handler实现shim包中的一面
type Handler struct {
sync.RWMutex
// RWMutex提供了四个方法:
// func (*RWMutex) Lock 写锁定
// func (*RWMutex) Unlock 写解锁
// func (*RWMutex) RLock 读锁定
// func (*RWMutex) RUnlock 读解锁
To string
ChatStream PeerChaincodeStream
FSM *fsm.FSM
cc Chaincode
// 多个查询(和一个事务)具有不同的UUID可以并行为执行chaincode
// responseChannel是其上响应由shim到chaincodeStub连通的通道。
responseChannel map[string]chan pb.ChaincodeMessage
// 跟踪哪些是的UUID交易,这是查询,以决定是否允许获取/把状态并调用chaincode。
isTransaction map[string]bool
//isTransaction的key为String类型,value为bool类型
nextState chan *nextStateInfo
}
PeerChaincodeStream接口定义了Peer和chaincode实例之间的流。
type PeerChaincodeStream interface {
Send(*pb.ChaincodeMessage) error
Recv() (*pb.ChaincodeMessage, error)
}
markIsTransaction
markIsTransaction函数标志着UUID作为交易或查询,
为true的时候代表交易,为false的时候代表查询
func (handler *Handler) markIsTransaction(uuid string, isTrans bool) bool {
if handler.isTransaction == nil {
return false
}
handler.Lock()
defer handler.Unlock()
handler.isTransaction[uuid] = isTrans
return true
}
exectransaction.go
Execute
执行交易或查询
func Execute(ctxt context.Context, chain *ChaincodeSupport, t *pb.Transaction) ([]byte, error) {
var err error
// 得到一个处理账本至标记TX的开始/结束
ledger, ledgerErr := ledger.GetLedger()
if ledgerErr != nil {
return nil, fmt.Errorf("Failed to get handle to ledger (%s)", ledgerErr)
}
if secHelper := chain.getSecHelper(); nil != secHelper {
var err error
t, err = secHelper.TransactionPreExecution(t)
// 注意,t被现在解密并且是原始输入t的深克隆
if nil != err {
return nil, err
}
}
if t.Type == pb.Transaction_CHAINCODE_NEW {
_, err := chain.DeployChaincode(ctxt, t)
if err != nil {
return nil, fmt.Errorf("Failed to deploy chaincode spec(%s)", err)
}
//启动并等待准备就绪
markTxBegin(ledger, t)
_, _, err = chain.LaunchChaincode(ctxt, t)
if err != nil {
markTxFinish(ledger, t, false)
return nil, fmt.Errorf("%s", err)
}
markTxFinish(ledger, t, true)
} else if t.Type == pb.Transaction_CHAINCODE_EXECUTE || t.Type == pb.Transaction_CHAINCODE_QUERY {
//将发动(如有必要,并等待就绪)
cID, cMsg, err := chain.LaunchChaincode(ctxt, t)
if err != nil {
return nil, fmt.Errorf("Failed to launch chaincode spec(%s)", err)
}
//这里应该生效,因为它上面的生效...
chaincode := cID.Name
if err != nil {
return nil, fmt.Errorf("Failed to stablish stream to container %s", chaincode)
}
// 当getTimeout调用被创建的事务块需要注释下一行,并取消注释
timeout := time.Duration(30000) * time.Millisecond
//timeout, err := getTimeout(cID)
if err != nil {
return nil, fmt.Errorf("Failed to retrieve chaincode spec(%s)", err)
}
var ccMsg *pb.ChaincodeMessage
if t.Type == pb.Transaction_CHAINCODE_EXECUTE {
ccMsg, err = createTransactionMessage(t.Uuid, cMsg)
if err != nil {
return nil, fmt.Errorf("Failed to transaction message(%s)", err)
}
} else {
ccMsg, err = createQueryMessage(t.Uuid, cMsg)
if err != nil {
return nil, fmt.Errorf("Failed to query message(%s)", err)
}
}
markTxBegin(ledger, t)
resp, err := chain.Execute(ctxt, chaincode, ccMsg, timeout, t)
if err != nil {
// 交易回滚
markTxFinish(ledger, t, false)
return nil, fmt.Errorf("Failed to execute transaction or query(%s)", err)
} else if resp == nil {
// 交易回滚
markTxFinish(ledger, t, false)
return nil, fmt.Errorf("Failed to receive a response for (%s)", t.Uuid)
} else {
if resp.Type == pb.ChaincodeMessage_COMPLETED || resp.Type == pb.ChaincodeMessage_QUERY_COMPLETED {
// Success
markTxFinish(ledger, t, true)
return resp.Payload, nil
} else if resp.Type == pb.ChaincodeMessage_ERROR || resp.Type == pb.ChaincodeMessage_QUERY_ERROR {
// Rollback transaction
markTxFinish(ledger, t, false)
return nil, fmt.Errorf("Transaction or query returned with failure: %s", string(resp.Payload))
}
markTxFinish(ledger, t, false)
return resp.Payload, fmt.Errorf("receive a response for (%s) but in invalid state(%d)", t.Uuid, resp.Type)
}
} else {
err = fmt.Errorf("Invalid transaction type %s", t.Type.String())
}
return nil, err
}
ExecuteTransactions
由一个执行数组中的一个上的交易将返回错误阵列之一的每个交易。如果执行成功,数组元素将是零。返回状态的哈希值
func ExecuteTransactions(ctxt context.Context, cname ChainName, xacts []*pb.Transaction) ([]byte, []error) {
var chain = GetChain(cname)
if chain == nil {
// 我们不应该到调到这里来,但在其他方面一个很好的提醒,以更好地处理
panic(fmt.Sprintf("[ExecuteTransactions]Chain %s not found
", cname))
}
errs := make([]error, len(xacts)+1)
for i, t := range xacts {
_, errs[i] = Execute(ctxt, chain, t)
}
ledger, hasherr := ledger.GetLedger()
var statehash []byte
if hasherr == nil {
statehash, hasherr = ledger.GetTempStateHash()
}
errs[len(errs)-1] = hasherr
return statehash, errs
}
chaincode下handler.go
//负责处理对Peer's 侧的chaincode流的管理
type Handler struct {
sync.RWMutex
ChatStream PeerChaincodeStream
FSM *fsm.FSM
ChaincodeID *pb.ChaincodeID
//此处理管理解密部署TX的副本,没有编码
deployTXSecContext *pb.Transaction
chaincodeSupport *ChaincodeSupport
registered bool
readyNotify chan bool
//是对TX UUID的要么调用或查询TX(解密)的映射。每个TX将被添加之前执行并完成时,执行删除操作
txCtxs map[string]*transactionContext
uuidMap map[string]bool
//跟踪哪些是UUID查询的;虽然shim保持包含这个,但它不能被信任
isTransaction map[string]bool
//用来发送并确保后的状态转换完成,
nextState chan *nextStateInfo
}
chaincode就分析到这里,其中有不正确的地方还望读者批评指正。接下来将分析另一个重要的部分Ledger,敬请期待哦