zoukankan      html  css  js  c++  java
  • SDP(10):文本式大数据运算环境-MongoDB-Engine功能设计

        为了让前面规划的互联网+数据平台能有效对电子商务数据进行管理及实现大数据统计功能,必须在平台上再增加一个MongDB-Engine:数据平台用户通过传入一种Context来指示MongoDB-Engine运算。与前面JDBC-Engine和Cassandra-Engine通过文本式传递指令不同的是:MangoDB没有一套像SQL或CQL这样的文本式编程语言。但MangoDB基本上都是通过Bson类型的参数进行运算的,Bson是个java interface:

    /**
     * An interface for types that are able to render themselves into a {@code BsonDocument}.
     *
     * @since 3.0
     */
    public interface Bson {
        /**
         * Render the filter into a BsonDocument.
         *
         * @param documentClass the document class in scope for the collection.  This parameter may be ignored, but it may be used to alter
         *                      the structure of the returned {@code BsonDocument} based on some knowledge of the document class.
         * @param codecRegistry the codec registry.  This parameter may be ignored, but it may be used to look up {@code Codec} instances for
         *                      the document class or any other related class.
         * @param <TDocument> the type of the document class
         * @return the BsonDocument
         */
        <TDocument> BsonDocument toBsonDocument(Class<TDocument> documentClass, CodecRegistry codecRegistry);
    }

    任何实现Bson的类型都可以通过toBsonDocument来进行Bson到Document的转换。下面是Filter类型的例子:

       private static final class OperatorFilter<TItem> implements Bson {
            private final String operatorName;
            private final String fieldName;
            private final TItem value;
    
            OperatorFilter(final String operatorName, final String fieldName, final TItem value) {
                this.operatorName = notNull("operatorName", operatorName);
                this.fieldName = notNull("fieldName", fieldName);
                this.value = value;
            }
    
            @Override
            public <TDocument> BsonDocument toBsonDocument(final Class<TDocument> documentClass, final CodecRegistry codecRegistry) {
                BsonDocumentWriter writer = new BsonDocumentWriter(new BsonDocument());
    
                writer.writeStartDocument();
                writer.writeName(fieldName);
                writer.writeStartDocument();
                writer.writeName(operatorName);
                encodeValue(writer, value, codecRegistry);
                writer.writeEndDocument();
                writer.writeEndDocument();
    
                return writer.getDocument();
            }
    
            @Override
            public String toString() {
                return operatorFilterToString(fieldName, operatorName, value);
            }
        }

    对于MongoDB-Engine,我们需要实现下面这几大类功能才能满足平台要求,包括:

    count
    disctinct
    aggregate
    mapReduce
    bulkWrite
    insert
    delete
    replace
    update

    具体函数细节和功能描述如下:

      count:
       /**
       * Counts the number of documents in the collection.
       * @return a Observable with a single element indicating the number of documents
       */
      def count(): SingleObservable[Long] = observeLong(wrapped.count(_: SingleResultCallback[java.lang.Long]))
    
      /**
       * Counts the number of documents in the collection according to the given options.
       * @param filter  the query filter
       * @param options the options describing the count
       * @return a Observable with a single element indicating the number of documents
       */
      def count(filter: Bson, options: CountOptions): SingleObservable[Long] =
        observeLong(wrapped.count(filter, options, _: SingleResultCallback[java.lang.Long]))
    
      distinct:
        /**
       * Gets the distinct values of the specified field name.
       * @param fieldName the field name
       * @tparam C       the target type of the observable.
       * @return a Observable emitting the sequence of distinct values
       */
      def distinct[C](fieldName: String)(implicit ct: ClassTag[C]): DistinctObservable[C] =
        DistinctObservable(wrapped.distinct(fieldName, ct))
    
      /**
       * Gets the distinct values of the specified field name.
       * @param fieldName the field name
       * @param filter  the query filter
       * @tparam C       the target type of the observable.
       * @return a Observable emitting the sequence of distinct values
       */
      def distinct[C](fieldName: String, filter: Bson)(implicit ct: ClassTag[C]): DistinctObservable[C] =
        DistinctObservable(wrapped.distinct(fieldName, filter, ct))
      
      find:
        /**
       * Finds all documents in the collection.
       * @tparam C   the target document type of the observable.
       * @return the find Observable
       */
      def find[C]()(implicit e: C DefaultsTo TResult, ct: ClassTag[C]): FindObservable[C] =
        FindObservable(wrapped.find[C](ct))
    
      /**
       * Finds all documents in the collection.
       * @param filter the query filter
       * @tparam C    the target document type of the observable.
       * @return the find Observable
       */
      def find[C](filter: Bson)(implicit e: C DefaultsTo TResult, ct: ClassTag[C]): FindObservable[C] =
        FindObservable(wrapped.find(filter, ct))
    
      aggregate:
       /**
       * Aggregates documents according to the specified aggregation pipeline.
       * @param pipeline the aggregate pipeline
       * @return a Observable containing the result of the aggregation operation
       *         [[http://docs.mongodb.org/manual/aggregation/ Aggregation]]
       */
      def aggregate[C](pipeline: Seq[Bson])(implicit e: C DefaultsTo TResult, ct: ClassTag[C]): AggregateObservable[C] =
        AggregateObservable(wrapped.aggregate[C](pipeline.asJava, ct))
    
      mapReduce:
        /**
       * Aggregates documents according to the specified map-reduce function.
       * @param mapFunction    A JavaScript function that associates or "maps" a value with a key and emits the key and value pair.
       * @param reduceFunction A JavaScript function that "reduces" to a single object all the values associated with a particular key.
       * @tparam C            the target document type of the observable.
       * @return a Observable containing the result of the map-reduce operation
       */
      def mapReduce[C](mapFunction: String, reduceFunction: String)(implicit e: C DefaultsTo TResult, ct: ClassTag[C]): MapReduceObservable[C] =
        MapReduceObservable(wrapped.mapReduce(mapFunction, reduceFunction, ct))
    
      bulkWrite:
        /**
       * Executes a mix of inserts, updates, replaces, and deletes.
       * @param requests the writes to execute
       * @return a Observable with a single element the BulkWriteResult
       */
      def bulkWrite(requests: Seq[_ <: WriteModel[_ <: TResult]]): SingleObservable[BulkWriteResult] =
        observe(wrapped.bulkWrite(
          requests.asJava.asInstanceOf[util.List[_ <: WriteModel[_ <: TResult]]],
          _: SingleResultCallback[BulkWriteResult]
        ))
    
      /**
       * Executes a mix of inserts, updates, replaces, and deletes.
       * @param requests the writes to execute
       * @param options  the options to apply to the bulk write operation
       * @return a Observable with a single element the BulkWriteResult
       */
      def bulkWrite(requests: Seq[_ <: WriteModel[_ <: TResult]], options: BulkWriteOptions): SingleObservable[BulkWriteResult] =
        observe(wrapped.bulkWrite(
          requests.asJava.asInstanceOf[util.List[_ <: WriteModel[_ <: TResult]]],
          options,
          _: SingleResultCallback[BulkWriteResult]
        ))
      
      insert:
       /**
       * Inserts the provided document. If the document is missing an identifier, the driver should generate one.
       * @param document the document to insert
       * @return a Observable with a single element indicating when the operation has completed or with either a
       *         com.mongodb.DuplicateKeyException or com.mongodb.MongoException
       */
      def insertOne(document: TResult): SingleObservable[Completed] = observeCompleted(wrapped.insertOne(document, _: SingleResultCallback[Void]))
    
      /**
       * Inserts the provided document. If the document is missing an identifier, the driver should generate one.
       * @param document the document to insert
       * @param options  the options to apply to the operation
       * @return a Observable with a single element indicating when the operation has completed or with either a
       *         com.mongodb.DuplicateKeyException or com.mongodb.MongoException
       */
      def insertOne(document: TResult, options: InsertOneOptions): SingleObservable[Completed] =
        observeCompleted(wrapped.insertOne(document, options, _: SingleResultCallback[Void]))
    
      /**
       * Inserts a batch of documents. The preferred way to perform bulk inserts is to use the BulkWrite API. However, when talking with 
       * server 2.6, using this method will be faster due to constraints in the bulk API related to error handling.
       * @param documents the documents to insert
       * @return a Observable with a single element indicating when the operation has completed or with either a
       *         com.mongodb.DuplicateKeyException or com.mongodb.MongoException
       */
      def insertMany(documents: Seq[_ <: TResult]): SingleObservable[Completed] =
        observeCompleted(wrapped.insertMany(documents.asJava, _: SingleResultCallback[Void]))
    
      /**
       * Inserts a batch of documents. The preferred way to perform bulk inserts is to use the BulkWrite API. However, when talking with 
       * server 2.6, using this method will be faster due to constraints in the bulk API related to error handling.
       * @param documents the documents to insert
       * @param options   the options to apply to the operation
       * @return a Observable with a single element indicating when the operation has completed or with either a
       *         com.mongodb.DuplicateKeyException or com.mongodb.MongoException
       */
      def insertMany(documents: Seq[_ <: TResult], options: InsertManyOptions): SingleObservable[Completed] =
        observeCompleted(wrapped.insertMany(documents.asJava, options, _: SingleResultCallback[Void]))
    
      delete:
       /**
       * Removes at most one document from the collection that matches the given filter.  If no documents match, the collection is not
       * modified.
       * @param filter the query filter to apply the the delete operation
       * @return a Observable with a single element the DeleteResult or with an com.mongodb.MongoException
       */
      def deleteOne(filter: Bson): SingleObservable[DeleteResult] = observe(wrapped.deleteOne(filter, _: SingleResultCallback[DeleteResult]))
    
      /**
       * Removes at most one document from the collection that matches the given filter.  If no documents match, the collection is not
       * modified.
       * @param filter the query filter to apply the the delete operation
       * @param options the options to apply to the delete operation
       * @return a Observable with a single element the DeleteResult or with an com.mongodb.MongoException
       */
      def deleteOne(filter: Bson, options: DeleteOptions): SingleObservable[DeleteResult] =
        observe(wrapped.deleteOne(filter, options, _: SingleResultCallback[DeleteResult]))
    
      /**
       * Removes all documents from the collection that match the given query filter.  If no documents match, the collection is not modified.
       * @param filter the query filter to apply the the delete operation
       * @return a Observable with a single element the DeleteResult or with an com.mongodb.MongoException
       */
      def deleteMany(filter: Bson): SingleObservable[DeleteResult] = observe(wrapped.deleteMany(filter, _: SingleResultCallback[DeleteResult]))
    
      /**
       * Removes all documents from the collection that match the given query filter.  If no documents match, the collection is not modified.
       * @param filter the query filter to apply the the delete operation
       * @param options the options to apply to the delete operation
       * @return a Observable with a single element the DeleteResult or with an com.mongodb.MongoException
       */
      def deleteMany(filter: Bson, options: DeleteOptions): SingleObservable[DeleteResult] =
        observe(wrapped.deleteMany(filter, options, _: SingleResultCallback[DeleteResult]))
     
      replace:
       /**
       * Replace a document in the collection according to the specified arguments.
       * [[http://docs.mongodb.org/manual/tutorial/modify-documents/#replace-the-document Replace]]
       * @param filter      the query filter to apply the the replace operation
       * @param replacement the replacement document
       * @return a Observable with a single element the UpdateResult
       */
      def replaceOne(filter: Bson, replacement: TResult): SingleObservable[UpdateResult] =
        observe(wrapped.replaceOne(filter, replacement, _: SingleResultCallback[UpdateResult]))
    
      /**
       * Replace a document in the collection according to the specified arguments.
       * @param filter      the query filter to apply the the replace operation
       * @param replacement the replacement document
       * @param options     the options to apply to the replace operation
       * @return a Observable with a single element the UpdateResult
       */
      def replaceOne(filter: Bson, replacement: TResult, options: UpdateOptions): SingleObservable[UpdateResult] =
        observe(wrapped.replaceOne(filter, replacement, options, _: SingleResultCallback[UpdateResult]))
    
      update:
       /**
       * Update a single document in the collection according to the specified arguments.
       * @param filter  a document describing the query filter, which may not be null. This can be of any type for which a `Codec` is
       *                registered
       * @param update  a document describing the update, which may not be null. The update to apply must include only update operators. This
       *                can be of any type for which a `Codec` is registered
       * @return a Observable with a single element the UpdateResult
       */
      def updateOne(filter: Bson, update: Bson): SingleObservable[UpdateResult] =
        observe(wrapped.updateOne(filter, update, _: SingleResultCallback[UpdateResult]))
    
      /**
       * Update a single document in the collection according to the specified arguments.
       * @param filter  a document describing the query filter, which may not be null. This can be of any type for which a `Codec` is
       *                registered
       * @param update  a document describing the update, which may not be null. The update to apply must include only update operators. This
       *                can be of any type for which a `Codec` is registered
       * @param options the options to apply to the update operation
       * @return a Observable with a single element the UpdateResult
       */
      def updateOne(filter: Bson, update: Bson, options: UpdateOptions): SingleObservable[UpdateResult] =
        observe(wrapped.updateOne(filter, update, options, _: SingleResultCallback[UpdateResult]))
      /**
       * Update documents in the collection according to the specified arguments.
       * @param filter  a document describing the query filter, which may not be null. This can be of any type for which a `Codec` is
       *                registered
       * @param update  a document describing the update, which may not be null. The update to apply must include only update operators. This
       *                can be of any type for which a `Codec` is registered
       * @return a Observable with a single element the UpdateResult
       */
      def updateMany(filter: Bson, update: Bson): SingleObservable[UpdateResult] =
        observe(wrapped.updateMany(filter, update, _: SingleResultCallback[UpdateResult]))

    可以看到:函数传入参数大致两个类型:Bson、XXOptions。我们还需要适度增加一些数据库管理功能,包括:

      createCollection
      listCollection
      dropCollection
      createIndex
      dropIndex
      createView

    具体函数细节和功能描述如下:

    dropCollection
     /**
       * Drops this collection from the Database.
       */
      def drop(): SingleObservable[Completed] = observeCompleted(wrapped.drop(_: SingleResultCallback[Void]))
    
    createIndex
      /**
       * @param key     an object describing the index key(s), which may not be null. This can be of any type for which a `Codec` is
       *                registered
       * @return a Observable with a single element indicating when the operation has completed
       */
      def createIndex(key: Bson): SingleObservable[String] =
        observe(wrapped.createIndex(key, _: SingleResultCallback[String]))
    
      /**
       * @param key     an object describing the index key(s), which may not be null. This can be of any type for which a `Codec` is
       *                registered
       * @param options the options for the index
       * @return a Observable with a single element indicating when the operation has completed
       */
      def createIndex(key: Bson, options: IndexOptions): SingleObservable[String] =
        observe(wrapped.createIndex(key, options, _: SingleResultCallback[String]))
      
      dropIndex
        /**
       * Drops the given index.
       * @param indexName the name of the index to remove
       * @return a Observable with a single element indicating when the operation has completed
       */
      def dropIndex(indexName: String): SingleObservable[Completed] = observeCompleted(wrapped.dropIndex(indexName, _: SingleResultCallback[Void]))
    
      /**
       * Drops the given index.
       * @param indexName the name of the index to remove
       * @param dropIndexOptions options to use when dropping indexes
       * @return a Observable with a single element indicating when the operation has completed
       */
      def dropIndex(indexName: String, dropIndexOptions: DropIndexOptions): SingleObservable[Completed] =
        observeCompleted(wrapped.dropIndex(indexName, dropIndexOptions, _: SingleResultCallback[Void]))
    
      /**
       * Drops the index given the keys used to create it.
       * @param keys the keys of the index to remove
       * @return a Observable with a single element indicating when the operation has completed
       */
      def dropIndex(keys: Bson): SingleObservable[Completed] = observeCompleted(wrapped.dropIndex(keys, _: SingleResultCallback[Void]))
    
      /**
       * Drops the index given the keys used to create it.
       * @param keys the keys of the index to remove
       * @param dropIndexOptions options to use when dropping indexes
       * @return a Observable with a single element indicating when the operation has completed
       */
      def dropIndex(keys: Bson, dropIndexOptions: DropIndexOptions): SingleObservable[Completed] =
        observeCompleted(wrapped.dropIndex(keys, dropIndexOptions, _: SingleResultCallback[Void]))
    
      /**
       * Drop all the indexes on this collection, except for the default on _id.
       * @return a Observable with a single element indicating when the operation has completed
       */
      def dropIndexes(): SingleObservable[Completed] =
        observeCompleted(wrapped.dropIndexes(_: SingleResultCallback[Void]))
    
      /**
       * Drop all the indexes on this collection, except for the default on _id.
       * @param dropIndexOptions options to use when dropping indexes
       * @return a Observable with a single element indicating when the operation has completed
       */
      def dropIndexes(dropIndexOptions: DropIndexOptions): SingleObservable[Completed] =
        observeCompleted(wrapped.dropIndexes(dropIndexOptions, _: SingleResultCallback[Void]))
    
      listCollection
        /**
       * Finds all the collections in this database.
       * @tparam TResult the target document type of the iterable.
       * @return the fluent list collections interface
       */
      def listCollections[TResult]()(implicit e: TResult DefaultsTo Document, ct: ClassTag[TResult]): ListCollectionsObservable[TResult] = ListCollectionsObservable(wrapped.listCollections(ct))
    
     createCollection
        /**
       * Create a new collection with the given name.
       * @param collectionName the name for the new collection to create
       * @return a Observable identifying when the collection has been created
       */
      def createCollection(collectionName: String): SingleObservable[Completed] =
        observeCompleted(wrapped.createCollection(collectionName, _: SingleResultCallback[Void]))
    
      /**
       * Create a new collection with the selected options
       * @param collectionName the name for the new collection to create
       * @param options        various options for creating the collection
       * @return a Observable identifying when the collection has been created
       */
      def createCollection(collectionName: String, options: CreateCollectionOptions): SingleObservable[Completed] =
        observeCompleted(wrapped.createCollection(collectionName, options, _: SingleResultCallback[Void]))
    
      createView
        /**
       * Creates a view with the given name, backing collection/view name, and aggregation pipeline that defines the view.
       * @param viewName the name of the view to create
       * @param viewOn   the backing collection/view for the view
       * @param pipeline the pipeline that defines the view
       */
      def createView(viewName: String, viewOn: String, pipeline: Seq[Bson]): SingleObservable[Completed] =
        observeCompleted(wrapped.createView(viewName, viewOn, pipeline.asJava, _: SingleResultCallback[Void]))
    
      /**
       * Creates a view with the given name, backing collection/view name, aggregation pipeline, and options that defines the view.
       * @param viewName          the name of the view to create
       * @param viewOn            the backing collection/view for the view
       * @param pipeline          the pipeline that defines the view
       * @param createViewOptions various options for creating the view
       */
      def createView(viewName: String, viewOn: String, pipeline: Seq[Bson], createViewOptions: CreateViewOptions): SingleObservable[Completed] =
        observeCompleted(wrapped.createView(viewName, viewOn, pipeline.asJava, createViewOptions, _: SingleResultCallback[Void]))

    下面就是根据以上需求分析初步做出的功能框架设计方案:

    import org.bson.conversions.Bson
    import org.mongodb.scala._
    
    trait MGOCommands
    
    object MGOCommands  {
      case class Count(filter: Option[Bson], options: Option[Any]) extends MGOCommands
      case class Distict(fieldName: String, filter: Option[Bson]) extends MGOCommands
      case class Find(filter: Option[Bson]) extends MGOCommands
      case class Aggregate(pipeLine: Seq[Bson]) extends MGOCommands
      case class MapReduce(mapFunction: String, reduceFunction: String) extends MGOCommands
      case class Insert(newdocs: Seq[Document], options: Option[Any]) extends MGOCommands
      case class Delete(filter: Bson, options: Option[Any], onlyOne: Boolean = false) extends MGOCommands
      case class Replace(filter: Bson, replacement: Document, options: Option[Any]) extends MGOCommands
      case class Update(filter: Bson, update: Bson, options: Option[Any]) extends MGOCommands
      case class BulkWrite(commands: Seq[MGOCommands], options: Option[Any]) extends MGOCommands
    }
    
    object MGOAdmins {
      case class DropCollection(collName: String) extends MGOCommands
      case class CreateCollection(collName: String, options: Option[Any]) extends MGOCommands
      case class ListCollection(dbName: String) extends MGOCommands
      case class CreateView(viewName: String, viewOn: String, pipeline: Seq[Bson], options: Option[Any]) extends MGOCommands
      case class CreateIndex(filter: Option[Bson], options: Option[Any]) extends MGOCommands
      case class DropIndexByName(indexName: String, options: Option[Any]) extends MGOCommands
      case class DropIndexByKey(key: Bson, options: Option[Any]) extends MGOCommands
      case class DropAllIndexes(options: Option[Any]) extends MGOCommands
    }
    
    case class MGOContext (
         dbName: String,
         collName: String,
         action: MGOCommands = null
    ) { ctx =>
      def setDbName(name: String): MGOContext = ctx.copy(dbName = name)
      def setCollName(name: String): MGOContext = ctx.copy(collName = name)
      def setCommand(cmd: MGOCommands): MGOContext = ctx.copy(action = cmd)
    }
    object MGOContext {
      def apply(db: String, coll: String) = new MGOContext(db,coll)
      def apply(db: String, coll: String, command: MGOCommands) =
        new MGOContext(db,coll,command)
    
    }
  • 相关阅读:
    docker添加sudo权限
    服务器出口ip
    flask
    ACM-奇特的立方体
    ACM-牛喝水
    ACM-可乐兑换
    ACM-Work Assignment
    ACM-DFS Template
    ACM-Checker Challenge
    ACM-Divide Tree
  • 原文地址:https://www.cnblogs.com/tiger-xc/p/8527681.html
Copyright © 2011-2022 走看看