zoukankan      html  css  js  c++  java
  • 【笔记】【Informatica】自定义序列生成器组件

    /**************************************************************************
     *
     * Copyright (c) 2003 Informatica Corporation.  This file contains
     * material proprietary to Informatica Corporation and may not be copied
     * or distributed in any form without the written permission of Informatica
     * Corporation
     *
     **************************************************************************/
    
    /**************************************************************************
     * Custom Transformation p_GetSeqVal Procedure File
     *
     * This file contains code that functions that will be called by the main
     * server executable.
     *
     * For more information on these files,
     * see $(PM_HOME)/ExtProc/include/Readme.txt
     **************************************************************************/
    
    /**************************************************************************
                                   Includes
     **************************************************************************/
    
    #include "p_GetSeqVal.h"
    #include <stdio.h>
    #include <string.h>
    #include <stdlib.h>
    #include <windows.h>
    #include <stdarg.h>
    #include <process.h>
    
    /**************************************************************************
                                   UserDefineMacro
     **************************************************************************/
    
    #define DLLNAME "SeqGenerator.dll"
    #define PMDTM "pmdtm.exe"
    #define EXTPROCDIR "ExtProc\"
    #define SEQFILE "SeqGenerator.tsv"
    #define LOCKFILE "SeqGenerator.lock"
    #define SEQNAMELEN 100
    #define MAXSEQNUM 10000
    typedef unsigned long int seqval_t;
    #define MSGCHARS 1000
    #define MAX_LINE_SIZE 1000
    #define MSGMARK ">>>>>>>>>>>>>>>"
    #define PROCNAME "SeqGenerator"
    
    /**************************************************************************
                                   SharedVariables
     **************************************************************************/
    
    struct seq_t{
    	char chSeqName[SEQNAMELEN+1];
    	seqval_t nSeqVal;
    };
    
    struct seqlist_t{
    	struct seq_t arrSequences[MAXSEQNUM];
    	int nSeqNums;
    };
    
    #pragma data_seg("SeqGeneratorShared")
    static int nInitFlag=0;
    static int nRefCount=0;
    static struct seqlist_t strSeqList={0};
    #pragma data_seg()
    
    #pragma comment(linker,"/section:SeqGeneratorShared,rws") 
    
    /**************************************************************************
                                   Debug switch
     **************************************************************************/
    
    //#define DEBUG_MODE
    
    #ifdef DEBUG_MODE
        //LogMessage("%s DebugLog:",MSGMARK);
    #endif
    
    /**************************************************************************
                                   Global Variables
     **************************************************************************/
    
    char chLogMsg[MSGCHARS],chMutexName[MSGCHARS];
    TCHAR szPath[MAX_PATH],dllDir[MAX_PATH],SeqFilePath[MAX_PATH],LockFilePath[MAX_PATH];
    FILE *fpSeqFile,*fpLockFile;
    char *rowSeqName;
    char chSeqName[SEQNAMELEN+1];
    seqval_t nSeqVal;
    struct seqcache_t{
    	char chSeqName[SEQNAMELEN+1];
    	seqval_t *ptrSeqVal;
    };
    
    struct seqlistcache_t{
    	struct seqcache_t arrSequences[MAXSEQNUM];
    	int nSeqNums;
    };
    struct seqlistcache_t strSeqListCache={0};
    //HANDLE hMutex;
    
    /**************************************************************************
                                   Functions
     **************************************************************************/
    
    int LogMessage(char *fmt,...){
        va_list ptrArg;
    	va_start(ptrArg,fmt);
    	vsprintf(chLogMsg,fmt,ptrArg);
    	va_end(ptrArg);
    	INFA_CTLogMessageM( eESL_LOG,chLogMsg);
    	return 0;
    }
    
    int GetFileNumRows(FILE* fp){   
        int i = 0;
        char strLine[MAX_LINE_SIZE];
        fseek(fp,0,SEEK_SET);
        while (fgets(strLine, MAX_LINE_SIZE, fp))
            i++;
        fseek(fp,0,SEEK_SET);
        return i;
    }
    
    char * left(char *dst,char *src, int n){
        char *p = src;
        char *q = dst;
        int len = strlen(src);
        if(n>len) n = len;
        while(n--) *(q++) = *(p++);
        *(q++)=''; 
        return dst;
    }
    
    char * mid(char *dst,char *src, int n,int m) {
        char *p = src;
        char *q = dst;
        int len = strlen(src);
        if(m>len) m = len-n; 
        if(n<0) n=0;  
        if(n>len) return NULL;
        p += n;
        while(m--) *(q++) = *(p++);
        *(q++)=''; 
        return dst;
    }
    
    /*
    INFA_STATUS mGetMutex(){
        while(1){
    	    hMutex=CreateMutex(NULL,FALSE,PROCNAME);
    		if(hMutex&&GetLastError()==ERROR_ALREADY_EXISTS){
    		    if(hMutex!=NULL){
    			    CloseHandle(hMutex);
    			}
    			Sleep(1000);
    			continue;
    		} 
    		else if(hMutex!=NULL){
    		    break;
    		}
    		else {
    			return INFA_FAILURE;
    		}
    	}
    	return INFA_SUCCESS;
    }
    */
    
    /**************************************************************************
       Function: p_GetSeqVal_procInit
    
     Description: Initialization for the procedure.  Returns INFA_SUCCESS if 
     procedure initialization succeeds, else return INFA_FAILURE.
    
     Input: procedure - the handle for the procedure
     Output: None
     Remarks: This function will get called once for the session at
     initialization time.  It will be called after the moduleInit function.
     **************************************************************************/
    
    INFA_STATUS p_GetSeqVal_procInit( INFA_CT_PROCEDURE_HANDLE procedure)
    {
    	//Sleep(10000);
        INFA_CTChangeStringMode( procedure, eASM_MBCS );
    	if( !GetModuleFileName( NULL, szPath, MAX_PATH ) ){
            LogMessage("GetModuleFileName failed (%d)
    ", GetLastError());
            return INFA_FAILURE;
    	} else {
    		LogMessage("ModuleFileName is : %s
    ", szPath);
    
    		mid(dllDir,szPath,0,strlen(szPath)-strlen(PMDTM));
    		strcat(dllDir,EXTPROCDIR);
    		LogMessage("ModuleDirectory is : %s
    ", dllDir);
    
    		strcpy(SeqFilePath,dllDir);
    		strcat(SeqFilePath,SEQFILE);
    		LogMessage("Sequence File is : %s
    ", SeqFilePath);
    
    		strcpy(LockFilePath,dllDir);
    		strcat(LockFilePath,LOCKFILE);
    		LogMessage("Lock File is : %s
    ", LockFilePath);
    	}
    	//CreateMutex(NULL,FALSE,PROCNAME);
    	//mGetMutex();
    	//WaitForSingleObject(hMutex, INFINITE);
    	fpLockFile=fopen(LockFilePath,"w");
    	while(LockFile(fpLockFile,1,1,1,1)!=0){
    		Sleep(2000);
    	}
    	if(1==++nRefCount){
    		int nFileNumRows,i;
            LogMessage("%s Loading Sequence File
    ",MSGMARK);
            fpSeqFile=fopen(SeqFilePath,"a+");
            nFileNumRows=GetFileNumRows(fpSeqFile);
    		LogMessage("%s Sequence Objects Nums: %d
    ",MSGMARK,nFileNumRows);
    		strSeqList.nSeqNums=0;
    		for(i=0;i<nFileNumRows;i++){
    			fscanf(fpSeqFile,"%s	%lu
    ",chSeqName,&nSeqVal);
    			strSeqList.nSeqNums++;
    			strcpy(strSeqList.arrSequences[i].chSeqName,chSeqName);
    			strSeqList.arrSequences[i].nSeqVal=nSeqVal;
    		}
    		if(EOF==fclose(fpSeqFile)){
    		    LogMessage("Close Sequence File Failed!
    " );
    		    return INFA_FAILURE;
    	    }
    		nInitFlag=1;
    		//CloseHandle(hMutex);
    		LogMessage("%s Finish loading Sequence File",MSGMARK);
    	} else {
    		LogMessage("%s Wait for loading Sequence File",MSGMARK);
    		while(1!=nInitFlag){
    		}
    		LogMessage("%s Finish loading Sequence File",MSGMARK);
    	}
    	UnlockFile(fpLockFile,1,1,1,1);
    	if(EOF==fclose(fpLockFile)){
            LogMessage("Close Lock File Failed!
    " );
    		return INFA_FAILURE;
    	}
    	//ReleaseMutex(hMutex);
    
    	
        return INFA_SUCCESS;
    }
    
    
    /**************************************************************************
       Function: p_GetSeqVal_procDeinit
    
     Description: Deinitialization for the procedure.  Returns INFA_SUCCESS if 
     procedure deinitialization succeeds, else return INFA_FAILURE.
    
     Input: procedure - the handle for the procedure
     Output: None
     Remarks: This function will get called once for the session at
     deinitialization time.  It will be called before the moduleDeinit
     function.
     **************************************************************************/
    
    INFA_STATUS p_GetSeqVal_procDeinit( INFA_CT_PROCEDURE_HANDLE procedure, INFA_STATUS sessionStatus )
    {
    	//mGetMutex();
    	//WaitForSingleObject(hMutex, INFINITE);
    	fpLockFile=fopen(LockFilePath,"w");
    	while(LockFile(fpLockFile,1,1,1,1)!=0){
    		Sleep(2000);
    	}
    	if(0==--nRefCount){
    		int i;
    		LogMessage("%s Writing Sequence File",MSGMARK);
    		fpSeqFile=fopen(SeqFilePath,"w");
    		for(i=0;i<strSeqList.nSeqNums;i++){
    			fprintf(fpSeqFile,"%s	%lu
    ",strSeqList.arrSequences[i].chSeqName,strSeqList.arrSequences[i].nSeqVal);
    		}
    		if(EOF==fclose(fpSeqFile)){
    			LogMessage("Close Sequence File Failed!
    " );
    			return INFA_FAILURE;
    		}
    		LogMessage("%s Finish Writing Sequence File",MSGMARK);
    	}
        UnlockFile(fpLockFile,1,1,1,1);
    	if(EOF==fclose(fpLockFile)){
            LogMessage("Close Lock File Failed!
    " );
    		return INFA_FAILURE;
    	}
    	//ReleaseMutex(hMutex);
    	//CloseHandle(hMutex);
        return INFA_SUCCESS;
    }
    
    
    /**************************************************************************
       Function: p_GetSeqVal_partitionInit
    
     Description: Initialization for the partition.  Returns INFA_SUCCESS if 
     partition deinitialization succeeds, else return INFA_FAILURE.
    
     Input: partition - the handle for the partition
     Output: None
     Remarks: This function will get called once for each partition for each
     transformation in the session.
     **************************************************************************/
    
    INFA_STATUS p_GetSeqVal_partitionInit( INFA_CT_PARTITION_HANDLE partition )
    {
        /*TODO: fill in code here*/
        return INFA_SUCCESS;
    }
    
    
    /**************************************************************************
       Function: p_GetSeqVal_partitionDeinit
    
     Description: Deinitialization for the partition.  Returns INFA_SUCCESS if 
     partition deinitialization succeeds, else return INFA_FAILURE.
    
     Input: partition - the handle for the partition
     Output: None
     Remarks: This function will get called once for each partition for each
     transformation in the session.
     **************************************************************************/
    
    INFA_STATUS p_GetSeqVal_partitionDeinit( INFA_CT_PARTITION_HANDLE partition )
    {
        /*TODO: fill in code here*/
        return INFA_SUCCESS;
    }
    
    
    /**************************************************************************
       Function: p_GetSeqVal_inputRowNotification
    
     Description: Notification that a row needs to be processed for an input
     group in a transformation for the given partition.  Returns INFA_ROWSUCCESS
     if the input row was processed successfully, INFA_ROWERROR if the input
     row was not processed successfully and INFA_FATALERROR if the input row
     causes the session to fail.
    
     Input: partition - the handle for the partition for the given row
            group - the handle for the input group for the given row
     Output: None
     Remarks: This function is probably where the meat of your code will go,
     as it is called for every row that gets sent into your transformation. 
     **************************************************************************/
    
    INFA_ROWSTATUS p_GetSeqVal_inputRowNotification( INFA_CT_PARTITION_HANDLE partition,
                                        INFA_CT_INPUTGROUP_HANDLE inputGroup )
    
    {
        const INFA_CT_OUTPUTGROUP_HANDLE* outputGroups = NULL;
        const INFA_CT_INPUTPORT_HANDLE* inputGroupPorts = NULL;
        const INFA_CT_OUTPUTPORT_HANDLE* outputGroupPorts = NULL;
        size_t nNumInputPorts = 0, nNumOutputGroups = 0, nNumPortsInOutputGroup = 0;
    	int i,j;
    	outputGroups = INFA_CTGetChildrenHandles(partition,&nNumOutputGroups,OUTPUTGROUPTYPE);
    	outputGroupPorts = INFA_CTGetChildrenHandles(outputGroups[0],&nNumPortsInOutputGroup,OUTPUTPORTTYPE);
    	inputGroupPorts = INFA_CTGetChildrenHandles(inputGroup,&nNumInputPorts,INPUTPORTTYPE);
    
        rowSeqName=(char*)INFA_CTGetDataVoid(inputGroupPorts[0]);
    
    	for(j=0;j<strSeqListCache.nSeqNums;j++){
            if(strcmp(rowSeqName,strSeqListCache.arrSequences[j].chSeqName)==0)
    			break;
    	}
    	if(j==strSeqListCache.nSeqNums){
    		for(i=0;i<strSeqList.nSeqNums;i++){
    			if(strcmp(rowSeqName,strSeqList.arrSequences[i].chSeqName)==0)
    				break;
    		}
    		if(i==strSeqList.nSeqNums){
    			strcpy(strSeqList.arrSequences[i].chSeqName,rowSeqName);
    			strSeqList.nSeqNums++;
    			strcpy(strSeqListCache.arrSequences[j].chSeqName,rowSeqName);
    		    strSeqListCache.arrSequences[j].ptrSeqVal=&(strSeqList.arrSequences[i].nSeqVal);
    		    strSeqListCache.nSeqNums++;
    		}
    		nSeqVal=++(strSeqList.arrSequences[i].nSeqVal);
    		strcpy(strSeqListCache.arrSequences[j].chSeqName,rowSeqName);
    		strSeqListCache.arrSequences[j].ptrSeqVal=&(strSeqList.arrSequences[i].nSeqVal);
    		strSeqListCache.nSeqNums++;
    	} else {
    		nSeqVal=++*(strSeqListCache.arrSequences[j].ptrSeqVal);
    	}
    	
    
        INFA_CTSetData(outputGroupPorts[0], &nSeqVal);
    	INFA_CTSetIndicator(outputGroupPorts[0],INFA_DATA_VALID);
        return INFA_CTOutputNotification(outputGroups[0]);
    }
    
    
    /**************************************************************************
       Function: p_GetSeqVal_eofNotification
    
     Description: Notification that the last row for an input group has already
     been seen.  Return INFA_FAILURE if the session should fail as a result of
     seeing this notification, INFA_SUCCESS otherwise.
    
     Input: partition - the handle for the partition for the notification
            group - the handle for the input group for the notification
     Output: None
     **************************************************************************/
    
    INFA_STATUS p_GetSeqVal_eofNotification( INFA_CT_PARTITION_HANDLE partition,
                                INFA_CT_INPUTGROUP_HANDLE group)
    {
        /*TODO: fill in code here*/
        return INFA_SUCCESS;
    }
    
    
    /**************************************************************************
       Function: p_GetSeqVal_dataBdryNotification
    
     Description: Notification that a transaction has ended.  The data
     boundary type can either be commit or rollback.
     Return INFA_FAILURE if the session should fail as a result of
     seeing this notification, INFA_SUCCESS otherwise.
    
     Input: partition - the handle for the partition for the notification
            transactionType - commit or rollback 
     Output: None
     **************************************************************************/
    
    INFA_STATUS p_GetSeqVal_dataBdryNotification ( 
                           INFA_CT_PARTITION_HANDLE partition,
                           INFA_CT_DATABDRY_TYPE transactionType)
    {
        /*TODO: fill in code here*/
        return INFA_SUCCESS;
    }
    

      

  • 相关阅读:
    Markdown学习笔记
    带下划线点域名解析失败
    前端工程师学习之路
    Java 调用 WebService 客户端代码 含通过代理调用
    MySQL 日期函数 时间函数 总结 (MySQL 5_X)
    Apache、Tomcat整合环境搭建
    201671010142 <java程序设计>初次学习心得与感悟
    201671010142 Java基本程序设计结构学习的感悟
    201671010142.第五章的学习总结
    201671010142 继承定义与使用 感悟与总结
  • 原文地址:https://www.cnblogs.com/AzikPhil/p/infa_dll_c.html
Copyright © 2011-2022 走看看