根据rfc4533,LDAP服务支持同步操作,今天来探究下同步该如何进行;
同步适用的场景
从rfc4533 1.2 可以了解到,同步操作用于应用与LDAP的最终一致性同步。在每次同步操作完成的阶段,同步客户端将会获取到服务端内容的复制或者客户但将会被通知需要一个完整的全局信息更新。除了由于服务端并发操作或者其他处理导致的瞬时不一致,客户端的同步备份是一份精确的服务端数据的反射。瞬间状态的不一致会通过后来的同步操作解决。
同步可能会用来作为如下场景:
- 白页应用服务可能会使用同步操作去维护一份目录信息碎片的事实拷贝。比如说:一个邮件用户代理使用同步操作去维护一个本地的企业地址簿拷贝。
- 元信息引擎会使用操作去维护一个目录信息树(DIT)的拷贝。
- 缓存代理服务可能会使用同步操作维护一个连贯的一致性的内容缓存。
- 两台异构的目录服务间的轻量主备同步。比如:同步操作可以被备份的ldap服务去维护一个备份的目录信息(DIT)片段。
rfc4533所描述的协议不会计划被使用到需要有事物一致性的应用中。
在协议中传输了变更节点(Entry)所有可见的值而不是增量,所以这个协议不适合变更亮特别大的应用和部署环境。
同步的模式
同步的模式分为如下两种
- polling for Changes (refreshOnly)客户端主动拉取
- Listening for Changes(refreshAndPersist)客户端拉取建立连接后,服务端会主动推送变更。
同步基础的元素
- syncUUID 节点的唯一ID
- syncCookie同步的状态Cookie,可以用来表示当前存储的状态。没有Cookie的代表第一次初始化同步。
- Sync Request Control 同步控制器,在Search Request Message中带上代表客户端想进行同步操作。
- Sync State Control 同步结果控制器,在SearchResultEntry 和
SearchResultReference Messages中存在。用来代表同步的返回信息。 - Sync Done Control (非关键性)代表同步的结束信息。
- Sync Info Message (非关键性)LDAP中间响应信息。
- Sync Result Codes 同步结果码
jndi中并没有包含这部分代码,所以找java的实现找了很久,最后在apache ldap中找到了同步的实现代码;现在就粘贴出来解析下;代码在
https://github.com/apache/directory-server/tree/4077a74e6ee878019ab1245fd80718781a37ef40/protocol-ldap中可以获取到。类名为:ReplicationConsumerImpl
/** * Starts the synchronization operation */ public ReplicationStatusEnum startSync() { CONSUMER_LOG.debug( "Starting the SyncRepl process for consumer {}", config.getReplicaId() ); // read the cookie if persisted //获取上次同步完后的cookie readCookie(); if ( config.isRefreshNPersist() ) { try { CONSUMER_LOG.debug( "==================== Refresh And Persist ==========" ); //RefreshNPersist模式下的查询 return doSyncSearch( SynchronizationModeEnum.REFRESH_AND_PERSIST, reload ); } catch ( Exception e ) { CONSUMER_LOG.error( "Failed to sync with refreshAndPersist mode", e ); return ReplicationStatusEnum.DISCONNECTED; } } else { //refreshOnly处理 return doRefreshOnly(); } }
先来看看refreshOnly模式的处理
private ReplicationStatusEnum doRefreshOnly() { while ( !disconnected ) { CONSUMER_LOG.debug( "==================== Refresh Only ==========" ); try { //while轮询进行查询,每次按照配置的时间进行查询拉取 doSyncSearch( SynchronizationModeEnum.REFRESH_ONLY, reload ); CONSUMER_LOG.debug( "--------------------- Sleep for {} seconds ------------------", ( config.getRefreshInterval() / 1000 ) ); Thread.sleep( config.getRefreshInterval() ); CONSUMER_LOG.debug( "--------------------- syncing again ------------------" ); } catch ( InterruptedException ie ) { CONSUMER_LOG.warn( "refresher thread interrupted" ); return ReplicationStatusEnum.DISCONNECTED; } catch ( Exception e ) { CONSUMER_LOG.error( "Failed to sync with refresh only mode", e ); return ReplicationStatusEnum.DISCONNECTED; } } return ReplicationStatusEnum.STOPPED; }
private ReplicationStatusEnum doSyncSearch( SynchronizationModeEnum syncType, boolean reloadHint ) throws Exception { CONSUMER_LOG.debug( "Starting synchronization mode {}, reloadHint {}", syncType, reloadHint ); // Prepare the Syncrepl Request //同步查询的SyncRequestControl准备 SyncRequestValue syncReq = new SyncRequestValueDecorator( directoryService.getLdapCodecService() ); syncReq.setMode( syncType ); syncReq.setReloadHint( reloadHint ); //对若已同步过并存储了cookie的处理 // If we have a persisted cookie, send it. if ( syncCookie != null ) { CONSUMER_LOG.debug( "searching on {} with searchRequest, cookie '{}'", config.getProducer(), Strings.utf8ToString( syncCookie ) ); syncReq.setCookie( syncCookie ); } else { CONSUMER_LOG.debug( "searching on {} with searchRequest, no cookie", config.getProducer() ); } searchRequest.addControl( syncReq ); //使用异步查询接口 // Do the search. We use a searchAsync because we want to get SearchResultDone responses SearchFuture sf = connection.searchAsync( searchRequest ); Response resp = sf.get(); CONSUMER_LOG.debug( "Response from {} : {}", config.getProducer(), resp ); // Now, process the responses. We loop until we have a connection termination or // a SearchResultDone (RefreshOnly mode) //一直执行异步查询,除非受到refreshOnly模式下的 searchResultDone结果。 while ( !( resp instanceof SearchResultDone ) && !sf.isCancelled() && !disconnected ) { if ( resp instanceof SearchResultEntry ) { SearchResultEntry result = ( SearchResultEntry ) resp; handleSearchResultEntry( result ); } else if ( resp instanceof SearchResultReference ) { handleSearchReference( ( SearchResultReference ) resp ); } else if ( resp instanceof IntermediateResponse ) { handleSyncInfo( ( IntermediateResponse ) resp ); } // Next entry resp = sf.get(); CONSUMER_LOG.debug( "Response from {} : {}", config.getProducer(), resp ); } if ( sf.isCancelled() ) { CONSUMER_LOG.debug( "Search sync on {} has been canceled ", config.getProducer(), sf.getCause() ); return ReplicationStatusEnum.DISCONNECTED; } else if ( disconnected ) { CONSUMER_LOG.debug( "Disconnected from {}", config.getProducer() ); return ReplicationStatusEnum.DISCONNECTED; } else { //对searchResultDone的处理 ResultCodeEnum resultCode = handleSearchResultDone( ( SearchResultDone ) resp ); CONSUMER_LOG.debug( "Rsultcode of Sync operation from {} : {}", config.getProducer(), resultCode ); //情况一,没有这个对象的报错 if ( resultCode == ResultCodeEnum.NO_SUCH_OBJECT ) { // log the error and handle it appropriately CONSUMER_LOG.warn( "The base Dn {} is not found on provider {}", config.getBaseDn(), config.getProducer() ); CONSUMER_LOG.warn( "Disconnecting the Refresh&Persist consumer from provider {}", config.getProducer() ); disconnect(); return ReplicationStatusEnum.DISCONNECTED; } else if ( resultCode == ResultCodeEnum.E_SYNC_REFRESH_REQUIRED ) { //情况二,需要进行一次全量刷新,全量刷新需要刷新cookie CONSUMER_LOG.warn( "Full SYNC_REFRESH required from {}", config.getProducer() ); reload = true; try { CONSUMER_LOG.debug( "Deleting baseDN {}", config.getBaseDn() ); // FIXME taking a backup right before deleting might be a good thing, just to be safe. // the backup file can be deleted after reload completes successfully // the 'rid' value is not taken into consideration when 'reload' is set // so any dummy value is fine deleteRecursive( new Dn( config.getBaseDn() ), -1000 ); } catch ( Exception e ) { CONSUMER_LOG .error( "Failed to delete the replica base as part of handling E_SYNC_REFRESH_REQUIRED, disconnecting the consumer", e ); } // Do a full update. removeCookie(); CONSUMER_LOG.debug( "Re-doing a syncRefresh from producer {}", config.getProducer() ); return ReplicationStatusEnum.REFRESH_REQUIRED; } else { CONSUMER_LOG.debug( "Got result code {} from producer {}. Replication stopped", resultCode, config.getProducer() ); return ReplicationStatusEnum.DISCONNECTED; } } }
private void handleSearchResultEntry( SearchResultEntry syncResult ) { CONSUMER_LOG.debug( "------------- starting handleSearchResult ------------" ); //从searchResultEntry中获取SyncStateValue SyncStateValue syncStateCtrl = ( SyncStateValue ) syncResult.getControl( SyncStateValue.OID ); try { Entry remoteEntry = new DefaultEntry( schemaManager, syncResult.getEntry() ); String uuid = remoteEntry.get( directoryService.getAtProvider().getEntryUUID() ).getString(); // lock on UUID to serialize the updates when there are multiple consumers // connected to several producers and to the *same* base/partition Object lock = getLockFor( uuid ); synchronized ( lock ) { int rid = -1; if ( syncStateCtrl.getCookie() != null ) { syncCookie = syncStateCtrl.getCookie(); rid = LdapProtocolUtils.getReplicaId( Strings.utf8ToString( syncCookie ) ); CONSUMER_LOG.debug( "assigning the cookie from sync state value control: {}", Strings.utf8ToString( syncCookie ) ); } //从中获取变更状态 SyncStateTypeEnum state = syncStateCtrl.getSyncStateType(); // check to avoid conversion of UUID from byte[] to String if ( CONSUMER_LOG.isDebugEnabled() ) { CONSUMER_LOG.debug( "state name {}", state.name() ); CONSUMER_LOG.debug( "entryUUID = {}", Strings.uuidToString( syncStateCtrl.getEntryUUID() ) ); } Dn remoteDn = remoteEntry.getDn(); switch ( state ) { case ADD: boolean remoteDnExist = false; try { remoteDnExist = session.exists( remoteDn ); } catch ( LdapNoSuchObjectException lnsoe ) { CONSUMER_LOG.error( lnsoe.getMessage() ); } if ( !remoteDnExist ) { CONSUMER_LOG.debug( "adding entry with dn {}", remoteDn ); CONSUMER_LOG.debug( remoteEntry.toString() ); AddOperationContext addContext = new AddOperationContext( session, remoteEntry ); addContext.setReplEvent( true ); addContext.setRid( rid ); OperationManager operationManager = directoryService.getOperationManager(); operationManager.add( addContext ); } else { CONSUMER_LOG.debug( "updating entry in refreshOnly mode {}", remoteDn ); modify( remoteEntry, rid ); } break; case MODIFY: CONSUMER_LOG.debug( "modifying entry with dn {}", remoteEntry.getDn().getName() ); modify( remoteEntry, rid ); break; case MODDN: String entryUuid = Strings.uuidToString( syncStateCtrl.getEntryUUID() ).toString(); applyModDnOperation( remoteEntry, entryUuid, rid ); break; case DELETE: CONSUMER_LOG.debug( "deleting entry with dn {}", remoteEntry.getDn().getName() ); if ( !session.exists( remoteDn ) ) { CONSUMER_LOG .debug( "looks like entry {} was already deleted in a prior update (possibly from another provider), skipping delete", remoteDn ); } else { // incase of a MODDN operation resulting in a branch to be moved out of scope // ApacheDS replication provider sends a single delete event on the Dn of the moved branch // so the branch needs to be recursively deleted here deleteRecursive( remoteEntry.getDn(), rid ); } break; case PRESENT: CONSUMER_LOG.debug( "entry present {}", remoteEntry ); break; default: throw new IllegalArgumentException( "Unexpected sync state " + state ); } //处理成功后保存下cookie // store the cookie only if the above operation was successful if ( syncStateCtrl.getCookie() != null ) { storeCookie(); } } } catch ( Exception e ) { CONSUMER_LOG.error( e.getMessage(), e ); } CONSUMER_LOG.debug( "------------- Ending handleSearchResult ------------" ); }
private void handleSyncInfo( IntermediateResponse syncInfoResp ) { try { CONSUMER_LOG.debug( "............... inside handleSyncInfo ..............." ); //从response中获取Sync Info Message,其中会包含删除了节点的信息 byte[] syncInfoBytes = syncInfoResp.getResponseValue(); if ( syncInfoBytes == null ) { return; } SyncInfoValueDecorator decorator = new SyncInfoValueDecorator( directoryService.getLdapCodecService() ); SyncInfoValue syncInfoValue = ( SyncInfoValue ) decorator.decode( syncInfoBytes ); byte[] cookie = syncInfoValue.getCookie(); if ( CONSUMER_LOG.isDebugEnabled() ) { CONSUMER_LOG.debug( "Received a SyncInfoValue from producer {} : {}", config.getProducer(), syncInfoValue ); } int replicaId = -1; if ( cookie != null ) { CONSUMER_LOG.debug( "setting the cookie from the sync info: " + Strings.utf8ToString( cookie ) ); CONSUMER_LOG.debug( "setting the cookie from the sync info: " + Strings.utf8ToString( cookie ) ); syncCookie = cookie; String cookieString = Strings.utf8ToString( syncCookie ); replicaId = LdapProtocolUtils.getReplicaId( cookieString ); } CONSUMER_LOG.info( "refreshDeletes: " + syncInfoValue.isRefreshDeletes() ); //获取删除了的节点的uuid List<byte[]> uuidList = syncInfoValue.getSyncUUIDs(); // if refreshDeletes set to true then delete all the entries with entryUUID // present in the syncIdSet if ( syncInfoValue.isRefreshDeletes() ) { deleteEntries( uuidList, false, replicaId ); } else { deleteEntries( uuidList, true, replicaId ); } CONSUMER_LOG.info( "refreshDone: " + syncInfoValue.isRefreshDone() ); storeCookie(); } catch ( Exception de ) { CONSUMER_LOG.error( "Failed to handle syncinfo message", de ); } CONSUMER_LOG.debug( ".................... END handleSyncInfo ..............." ); }
根据如上代码进行处理,一个自定义的ldap同步sdk就可以出炉了;