zoukankan      html  css  js  c++  java
  • FastDFS源代码分析之tracker协议分析

    本篇博客主要解说fastdfs中tracker协议的解说。

    fastdfs主要是存储文件。直接把整个文件存储到磁盘上,所以。简单直接。可是也有非常大的局限性。

    因此,fastdfs对文件的文件夹设置和存储是最为核心的。


    为什么这么突然的解说这些。由于我已经看了一段时间的fastdfs,主要结构都已经搞的比較清晰了。因此,这篇文章。我就主要一tracker这一部分的协议来分析。


    其它详细介绍tracker的请百度。我就不介绍了,我就直接从

    int tracker_deal_task(struct fast_task_info *pTask)

    这种方法開始对每一个case分析。


    1、storage心跳协议

    case TRACKER_PROTO_CMD_STORAGE_BEAT:
    			TRACKER_CHECK_LOGINED(pTask)
    			result = tracker_deal_storage_beat(pTask);
    			break;

    自然。该协议是从storage层发送给tracker层的数据包,

    #define TRACKER_PROTO_CMD_STORAGE_BEAT              83  //storage heart beat

    那么,storage主要是做了什么:

    storage在启动的时候,会开启一个线程,该线程为

    static void *tracker_report_thread_entrance(void *arg)

    该函数主要是做了依据配置连接对应的它的组的tacker。做一些事情,这里有个while循环,代码例如以下

    current_time = g_current_time;
    			if (current_time - last_beat_time >= 
    					g_heart_beat_interval)
    			{
    				if (tracker_heart_beat(pTrackerServer, 
    					&stat_chg_sync_count, 
    					&bServerPortChanged) != 0)
    				{
    					break;
    				}


    也就是至少30秒钟来一次心跳。心跳包的主要数据是包头和当前storage的状态信息,

    char out_buff[sizeof(TrackerHeader) + sizeof(FDFSStorageStatBuff)];

    /* struct for network transfering */
    typedef struct
    {
    	char sz_total_upload_count[8];
    	char sz_success_upload_count[8];
    	char sz_total_append_count[8];
    	char sz_success_append_count[8];
    	char sz_total_modify_count[8];
    	char sz_success_modify_count[8];
    	char sz_total_truncate_count[8];
    	char sz_success_truncate_count[8];
    	char sz_total_set_meta_count[8];
    	char sz_success_set_meta_count[8];
    	char sz_total_delete_count[8];
    	char sz_success_delete_count[8];
    	char sz_total_download_count[8];
    	char sz_success_download_count[8];
    	char sz_total_get_meta_count[8];
    	char sz_success_get_meta_count[8];
    	char sz_total_create_link_count[8];
    	char sz_success_create_link_count[8];
    	char sz_total_delete_link_count[8];
    	char sz_success_delete_link_count[8];
    	char sz_total_upload_bytes[8];
    	char sz_success_upload_bytes[8];
    	char sz_total_append_bytes[8];
    	char sz_success_append_bytes[8];
    	char sz_total_modify_bytes[8];
    	char sz_success_modify_bytes[8];
    	char sz_total_download_bytes[8];
    	char sz_success_download_bytes[8];
    	char sz_total_sync_in_bytes[8];
    	char sz_success_sync_in_bytes[8];
    	char sz_total_sync_out_bytes[8];
    	char sz_success_sync_out_bytes[8];
    	char sz_total_file_open_count[8];
    	char sz_success_file_open_count[8];
    	char sz_total_file_read_count[8];
    	char sz_success_file_read_count[8];
    	char sz_total_file_write_count[8];
    	char sz_success_file_write_count[8];
    	char sz_last_source_update[8];
    	char sz_last_sync_update[8];
    	char sz_last_synced_timestamp[8];
    	char sz_last_heart_beat_time[8];
    } FDFSStorageStatBuff;
    


    tracker主要是做了什么呢?

    对其进行解包,然后对这个保存在本地的storage的信息进行保存到文件里,调用

    	status = tracker_save_storages();

    调用

    	tracker_mem_active_store_server(pClientInfo->pGroup, 
    				pClientInfo->pStorage);
    将这个存储服务器假设没有,就插入到group中。




    最后调用

    static int tracker_check_and_sync(struct fast_task_info *pTask, 
    			const int status)

    检查对应的改变状态。并将其同步等。

    (须要再具体看看)


    2、报告对应同步时间

    #define TRACKER_PROTO_CMD_STORAGE_SYNC_REPORT	    89  //report src last synced time as dest server


    相同在storage的report线程运行

    if (sync_time_chg_count != g_sync_change_count && 
    				current_time - last_sync_report_time >= 
    					g_heart_beat_interval)
    			{
    				if (tracker_report_sync_timestamp( 
    					pTrackerServer, &bServerPortChanged)!=0)
    				{
    					break;
    				}
    
    				sync_time_chg_count = g_sync_change_count;
    				last_sync_report_time = current_time;
    			}


    详细的数据包为

    pEnd = g_storage_servers + g_storage_count;
    	for (pServer=g_storage_servers; pServer<pEnd; pServer++)
    	{
    		memcpy(p, pServer->server.id, FDFS_STORAGE_ID_MAX_SIZE);
    		p += FDFS_STORAGE_ID_MAX_SIZE;
    		int2buff(pServer->last_sync_src_timestamp, p);
    		p += 4;
    	}

    也就是遍历当前进程的本组全部storageserver,和上次同步的时间戳。给trackerserver。

    然后tracker的server存储结构为

    pClientInfo->pGroup->last_sync_timestamps 
    				[src_index][dest_index] = sync_timestamp;


    dest_index 值为当前连接所在组的索引值

    dest_index = tracker_mem_get_storage_index(pClientInfo->pGroup,
    			pClientInfo->pStorage);
    	if (dest_index < 0 || dest_index >= pClientInfo->pGroup->count)
    	{
    		status = 0;
    		break;
    	}

    由于 本链接的storage是固定不变的,而src_index就是为本组的其它storage的id索引,

    首相通过id。(ip地址)找到详细的storage。然后在通过指针找到索引位置,


    最后。调用

    	if (++g_storage_sync_time_chg_count % 
    			TRACKER_SYNC_TO_FILE_FREQ == 0)
    	{
    		status = tracker_save_sync_timestamps();
    	}
    	else
    	{
    		status = 0;
    	}
    	} while (0);
    
    	return tracker_check_and_sync(pTask, status);

    定时保存文件和检查等

    3、上报磁盘情况

    #define TRACKER_PROTO_CMD_STORAGE_REPORT_DISK_USAGE 84  //report disk usage

    相同线程定时调用。

    	if (current_time - last_df_report_time >= 
    					g_stat_report_interval)
    			{
    				if (tracker_report_df_stat(pTrackerServer, 
    						&bServerPortChanged) != 0)
    				{
    					break;
    				}
    
    				last_df_report_time = current_time;
    			}


    相同上报这些数据

    for (i=0; i<g_fdfs_store_paths.count; i++)
    	{
    		if (statvfs(g_fdfs_store_paths.paths[i], &sbuf) != 0)
    		{
    			logError("file: "__FILE__", line: %d, " 
    				"call statfs fail, errno: %d, error info: %s.",
    				__LINE__, errno, STRERROR(errno));
    
    			if (pBuff != out_buff)
    			{
    				free(pBuff);
    			}
    			return errno != 0 ? errno : EACCES;
    		}
    
    		g_path_space_list[i].total_mb = ((int64_t)(sbuf.f_blocks) * 
    					sbuf.f_frsize) / FDFS_ONE_MB;
    		g_path_space_list[i].free_mb = ((int64_t)(sbuf.f_bavail) * 
    					sbuf.f_frsize) / FDFS_ONE_MB;
    		long2buff(g_path_space_list[i].total_mb, pStatBuff->sz_total_mb);
    		long2buff(g_path_space_list[i].free_mb, pStatBuff->sz_free_mb);
    
    		pStatBuff++;
    	}


    tracker这边存储在

    int64_t *path_total_mbs; //total disk storage in MB
    int64_t *path_free_mbs;  //free disk storage in MB

    这里

    path_total_mbs[i] = buff2long(pStatBuff->sz_total_mb);
    		path_free_mbs[i] = buff2long(pStatBuff->sz_free_mb);
    
    		pClientInfo->pStorage->total_mb += path_total_mbs[i];
    		pClientInfo->pStorage->free_mb += path_free_mbs[i];
    



    4、storage服增加到tracker

    #define TRACKER_PROTO_CMD_STORAGE_JOIN              81


    storage线程相同在该处调用

    if (tracker_report_join(pTrackerServer, tracker_index, 
    					sync_old_done) != 0)
    		{
    			sleep(g_heart_beat_interval);
    			continue;
    		}


    发送的包体数据包为:

    typedef struct
    {
    	char group_name[FDFS_GROUP_NAME_MAX_LEN+1];
    	char storage_port[FDFS_PROTO_PKG_LEN_SIZE];
    	char storage_http_port[FDFS_PROTO_PKG_LEN_SIZE];
    	char store_path_count[FDFS_PROTO_PKG_LEN_SIZE];
    	char subdir_count_per_path[FDFS_PROTO_PKG_LEN_SIZE];
    	char upload_priority[FDFS_PROTO_PKG_LEN_SIZE];
    	char join_time[FDFS_PROTO_PKG_LEN_SIZE]; //storage join timestamp
    	char up_time[FDFS_PROTO_PKG_LEN_SIZE];   //storage service started timestamp
    	char version[FDFS_VERSION_SIZE];   //storage version
    	char domain_name[FDFS_DOMAIN_NAME_MAX_SIZE];
    	char init_flag;
    	signed char status;
    	char tracker_count[FDFS_PROTO_PKG_LEN_SIZE];  //all tracker server count
    } TrackerStorageJoinBody;
    

    当赋值完毕后。在气候变增加

    p = out_buff + sizeof(TrackerHeader) + sizeof(TrackerStorageJoinBody);
    	pServerEnd = g_tracker_group.servers + g_tracker_group.server_count;
    	for (pServer=g_tracker_group.servers; pServer<pServerEnd; pServer++)
    	{
    		/*
    		if (strcmp(pServer->ip_addr, pTrackerServer->ip_addr) == 0 && 
    			pServer->port == pTrackerServer->port)
    		{
    			continue;
    		}
    		tracker_count++;
    		*/
    
    		sprintf(p, "%s:%d", pServer->ip_addr, pServer->port);
    		p += FDFS_PROTO_IP_PORT_SIZE;
    	}

    增加全部tracker的server信息格式为ip:port


    tracker server接收

    	case TRACKER_PROTO_CMD_STORAGE_JOIN:
    			result = tracker_deal_storage_join(pTask);
    			break;


    获取到的相关信息存储到

    typedef struct
    {
    	int storage_port;
    	int storage_http_port;
    	int store_path_count;
    	int subdir_count_per_path;
    	int upload_priority;
    	int join_time; //storage join timestamp (create timestamp)
    	int up_time;   //storage service started timestamp
            char version[FDFS_VERSION_SIZE];   //storage version
    	char group_name[FDFS_GROUP_NAME_MAX_LEN + 1];
            char domain_name[FDFS_DOMAIN_NAME_MAX_SIZE];
            char init_flag;
    	signed char status;
    	int tracker_count;
    	ConnectionInfo tracker_servers[FDFS_MAX_TRACKERS];
    } FDFSStorageJoinBody;
    

    这些结构体内

    同一时候插入本地内存

    result = tracker_mem_add_group_and_storage(pClientInfo,
    pTask->client_ip, &joinBody, true);


    同一时候把发消息报的id传过来

    	pJoinBodyResp = (TrackerStorageJoinBodyResp *)(pTask->data + 
    				sizeof(TrackerHeader));
    	memset(pJoinBodyResp, 0, sizeof(TrackerStorageJoinBodyResp));
    
    	if (pClientInfo->pStorage->psync_src_server != NULL)
    	{
    		strcpy(pJoinBodyResp->src_id, 
    			pClientInfo->pStorage->psync_src_server->id);
    	}


    5、报告存储状态


    #define TRACKER_PROTO_CMD_STORAGE_REPORT_STATUS     76  //report specified storage server status

    storageserver调用

    int tracker_report_storage_status(ConnectionInfo *pTrackerServer, 
    		FDFSStorageBrief *briefServer)

    内容主要是组名字

    strcpy(out_buff + sizeof(TrackerHeader), g_group_name);

    和简要信息

    	memcpy(out_buff + sizeof(TrackerHeader) + FDFS_GROUP_NAME_MAX_LEN, 
    			briefServer, sizeof(FDFSStorageBrief));

    其结构体例如以下

    typedef struct
    {
    	char status;
    	char port[4];
    	char id[FDFS_STORAGE_ID_MAX_SIZE];
    	char ip_addr[IP_ADDRESS_SIZE];
    } FDFSStorageBrief;
    


    6、从tracker获取storage状态。

    #define TRACKER_PROTO_CMD_STORAGE_GET_STATUS	    71  //get storage status from tracker

    该协议是由client发起

    调用流程例如以下:


    int tracker_get_storage_status(ConnectionInfo *pTrackerServer, 
    		const char *group_name, const char *ip_addr, 
    		FDFSStorageBrief *pDestBuff)
    int tracker_get_storage_max_status(TrackerServerGroup *pTrackerGroup, 
    		const char *group_name, const char *ip_addr, 
    		char *storage_id, int *status)	
    int tracker_get_storage_status(ConnectionInfo *pTrackerServer, 
    		const char *group_name, const char *ip_addr, 
    		FDFSStorageBrief *pDestBuff)

    获取自己的状态,

    包体格式   组名 ip的字符串


    tracker通过获取了对应的数据。查找到storage的信息

    结构体为:

    typedef struct
    {
    	char status;
    	char port[4];
    	char id[FDFS_STORAGE_ID_MAX_SIZE];
    	char ip_addr[IP_ADDRESS_SIZE];
    } FDFSStorageBrief;


    赋值后。返回


    7、通过tracker获取storageid

    #define TRACKER_PROTO_CMD_STORAGE_GET_SERVER_ID    70  //get storage server id from tracker

    和上以协议请求一样 groupname+ip 组成。


    tracker处理方法

    static int tracker_deal_get_storage_id(struct fast_task_info *pTask)

    tracker最后通过

    FDFSStorageIdInfo *fdfs_get_storage_id_by_ip(const char *group_name, 
    		const char *pIpAddr)
    {
    	FDFSStorageIdInfo target;
    	memset(&target, 0, sizeof(FDFSStorageIdInfo));
    	snprintf(target.group_name, sizeof(target.group_name), "%s", group_name);
    	snprintf(target.ip_addr, sizeof(target.ip_addr), "%s", pIpAddr);
    	return (FDFSStorageIdInfo *)bsearch(&target, g_storage_ids_by_ip, 
    		g_storage_id_count, sizeof(FDFSStorageIdInfo), 
    		fdfs_cmp_group_name_and_ip);
    }

    该方法获取了了
    FDFSStorageIdInfo

    信息。然后赋值,返回。


    8、通过tracker获取全部storageserver

    #define TRACKER_PROTO_CMD_STORAGE_FETCH_STORAGE_IDS 69  //get all storage ids from tracker

    	for (i=0; i<5; i++)
    	{
    		for (pGServer=pServerStart; pGServer<pServerEnd; pGServer++)
    		{
    			memcpy(pTServer, pGServer, sizeof(ConnectionInfo));
    			pTServer->sock = -1;
    			result = fdfs_get_storage_ids_from_tracker_server(pTServer);
    			if (result == 0)
    			{
    				return result;
    			}
    		}
    
    		if (pServerStart != pTrackerGroup->servers)
    		{
    			pServerStart = pTrackerGroup->servers;
    		}
    		sleep(1);
    	}

    调用顺序

    int storage_func_init(const char *filename, char *bind_addr, const int addr_size)
    int fdfs_get_storage_ids_from_tracker_group(TrackerServerGroup *pTrackerGroup)
    int fdfs_get_storage_ids_from_tracker_server(ConnectionInfo *pTrackerServer)


    tracker函数。每秒钟中调用。遍历全部的trackersserver


    trackerserver获取

    case TRACKER_PROTO_CMD_STORAGE_FETCH_STORAGE_IDS:
    			result = tracker_deal_fetch_storage_ids(pTask);
    			break;

    然后通过这样的协议格式

    </pre></p><p>返回的数据</p><p></p><pre name="code" class="cpp">pIdsStart = g_storage_ids_by_ip + start_index;
    	pIdsEnd = g_storage_ids_by_ip + g_storage_id_count;
    	for (pIdInfo = pIdsStart; pIdInfo < pIdsEnd; pIdInfo++)
    	{
    		if ((int)(p - pTask->data) > pTask->size - 64)
    		{
    			break;
    		}
    
    		p += sprintf(p, "%s %s %s
    ", pIdInfo->id, 
    			pIdInfo->group_name, pIdInfo->ip_addr);
    	}



     返回给请求者。


    9、回复给新的storage

    #define TRACKER_PROTO_CMD_STORAGE_REPLICA_CHG       85  //repl new storage servers


    storageserver调用流程:



    剩下的协议


    static int tracker_merge_servers(ConnectionInfo *pTrackerServer, 
    		FDFSStorageBrief *briefServers, const int server_count)

    case TRACKER_PROTO_CMD_SERVICE_QUERY_FETCH_ONE:
    			result = tracker_deal_service_query_fetch_update( 
    					pTask, pHeader->cmd);
    			break;
    		case TRACKER_PROTO_CMD_SERVICE_QUERY_UPDATE:
    			result = tracker_deal_service_query_fetch_update( 
    					pTask, pHeader->cmd);
    			break;
    		case TRACKER_PROTO_CMD_SERVICE_QUERY_FETCH_ALL:
    			result = tracker_deal_service_query_fetch_update( 
    					pTask, pHeader->cmd);
    			break;
    		case TRACKER_PROTO_CMD_SERVICE_QUERY_STORE_WITHOUT_GROUP_ONE:
    			result = tracker_deal_service_query_storage( 
    					pTask, pHeader->cmd);
    			break;
    		case TRACKER_PROTO_CMD_SERVICE_QUERY_STORE_WITH_GROUP_ONE:
    			result = tracker_deal_service_query_storage( 
    					pTask, pHeader->cmd);
    			break;
    		case TRACKER_PROTO_CMD_SERVICE_QUERY_STORE_WITHOUT_GROUP_ALL:
    			result = tracker_deal_service_query_storage( 
    					pTask, pHeader->cmd);
    			break;
    		case TRACKER_PROTO_CMD_SERVICE_QUERY_STORE_WITH_GROUP_ALL:
    			result = tracker_deal_service_query_storage( 
    					pTask, pHeader->cmd);
    			break;
    		case TRACKER_PROTO_CMD_SERVER_LIST_ONE_GROUP:
    			result = tracker_deal_server_list_one_group(pTask);
    			break;
    		case TRACKER_PROTO_CMD_SERVER_LIST_ALL_GROUPS:
    			result = tracker_deal_server_list_all_groups(pTask);
    			break;
    		case TRACKER_PROTO_CMD_SERVER_LIST_STORAGE:
    			result = tracker_deal_server_list_group_storages(pTask);
    			break;
    		case TRACKER_PROTO_CMD_STORAGE_SYNC_SRC_REQ:
    			result = tracker_deal_storage_sync_src_req(pTask);
    			break;
    		case TRACKER_PROTO_CMD_STORAGE_SYNC_DEST_REQ:
    			TRACKER_CHECK_LOGINED(pTask)
    			result = tracker_deal_storage_sync_dest_req(pTask);
    			break;
    		case TRACKER_PROTO_CMD_STORAGE_SYNC_NOTIFY:
    			result = tracker_deal_storage_sync_notify(pTask);
    			break;
    		case TRACKER_PROTO_CMD_STORAGE_SYNC_DEST_QUERY:
    			result = tracker_deal_storage_sync_dest_query(pTask);
    			break;
    		case TRACKER_PROTO_CMD_SERVER_DELETE_STORAGE:
    			result = tracker_deal_server_delete_storage(pTask);
    			break;
    		case TRACKER_PROTO_CMD_SERVER_SET_TRUNK_SERVER:
    			result = tracker_deal_server_set_trunk_server(pTask);
    			break;
    		case TRACKER_PROTO_CMD_STORAGE_REPORT_IP_CHANGED:
    			result = tracker_deal_storage_report_ip_changed(pTask);
    			break;
    		case TRACKER_PROTO_CMD_STORAGE_CHANGELOG_REQ:
    			result = tracker_deal_changelog_req(pTask);
    			break;
    		case TRACKER_PROTO_CMD_STORAGE_PARAMETER_REQ:
    			result = tracker_deal_parameter_req(pTask);
    			break;
    		case FDFS_PROTO_CMD_QUIT:
    			close(pTask->ev_read.ev_fd);
    			task_finish_clean_up(pTask);
    			return 0;
    		case FDFS_PROTO_CMD_ACTIVE_TEST:
    			result = tracker_deal_active_test(pTask);
    			break;
    		case TRACKER_PROTO_CMD_TRACKER_GET_STATUS:
    			result = tracker_deal_get_tracker_status(pTask);
    			break;
    		case TRACKER_PROTO_CMD_TRACKER_GET_SYS_FILES_START:
    			result = tracker_deal_get_sys_files_start(pTask);
    			break;
    		case TRACKER_PROTO_CMD_TRACKER_GET_ONE_SYS_FILE:
    			result = tracker_deal_get_one_sys_file(pTask);
    			break;
    		case TRACKER_PROTO_CMD_TRACKER_GET_SYS_FILES_END:
    			result = tracker_deal_get_sys_files_end(pTask);
    			break;
    		case TRACKER_PROTO_CMD_STORAGE_REPORT_TRUNK_FID:
    			TRACKER_CHECK_LOGINED(pTask)
    			result = tracker_deal_report_trunk_fid(pTask);
    			break;
    		case TRACKER_PROTO_CMD_STORAGE_FETCH_TRUNK_FID:
    			TRACKER_CHECK_LOGINED(pTask)
    			result = tracker_deal_get_trunk_fid(pTask);
    			break;
    		case TRACKER_PROTO_CMD_STORAGE_REPORT_TRUNK_FREE:
    			TRACKER_CHECK_LOGINED(pTask)
    			result = tracker_deal_report_trunk_free_space(pTask);
    			break;
    		case TRACKER_PROTO_CMD_TRACKER_PING_LEADER:
    			result = tracker_deal_ping_leader(pTask);
    			break;
    		case TRACKER_PROTO_CMD_TRACKER_NOTIFY_NEXT_LEADER:
    			result = tracker_deal_notify_next_leader(pTask);
    			break;
    		case TRACKER_PROTO_CMD_TRACKER_COMMIT_NEXT_LEADER:
    			result = tracker_deal_commit_next_leader(pTask);
    			break;
    
































  • 相关阅读:
    kaggle之员工离职分析
    Titanic幸存预测分析(Kaggle)
    学习python,第五篇
    VLAN入门知识
    复习下VLAN的知识
    复习下网络七层协议
    学习python,第四篇:Python 3中bytes/string的区别
    学习python,第三篇:.pyc是个什么鬼?
    学习python,第二篇
    学习python,第一篇
  • 原文地址:https://www.cnblogs.com/mengfanrong/p/5134902.html
Copyright © 2011-2022 走看看