zoukankan      html  css  js  c++  java
  • HBase(二): c#访问HBase之股票行情Demo

         上一章完成了c#访问hbase的sdk封装,接下来以一个具体Demo对sdk进行测试验证。场景:每5秒抓取指定股票列表的实时价格波动行情,数据下载后,一方面实时刷新UI界面,另一方面将数据放入到在内存中模拟的MQ (实际生产情况,可用kafka等集群代替)->存入HBase数据库。提供按指定时间范围股票价格数据查询。

    目录:

    • 示例说明
    • 示例效果图
    • rest server运行状态检查
    • 获取股票实时数据代码
    • 数据持续化至Hbase代码
    • 从HBase读取数据代码

    示例说明:

    • 在Hbase 中创建两个表,分别为:
    1. StocksInfo (股票信息表,用来存储设置的股票代码、股票名称)
    2. StockRealInfo (股票实时行情数据,包含开盘价、当前价、最高价、最低价、五档竞买、卖单价和数量、成交单价、数量、涨跌幅等)
    • 每5秒钟抓取StocksInfo表中所有股票的数据,自动更新UI,持续化到HBase;支持增加、删除要监控的股票列表。
    • 提供按指定时间范围从hbase中查询历史数据

    示例效果图:

    • 历史数据查询:

    rest server运行状态检查:

    • 在 HDP2.4安装(五):集群及组件安装 章节,Hbase 主机安装在 hdp4 192.168.2.21 上,使用xshell 工具连接到hbase master(hdp4)
    • 查看8080端口是否正常,也可从 ambari UI 界面查看HBase状态,如图:

    获取股票实时数据代码:

    •  好多的网站提供股票实时交易数据的下载,我选择的是从 hq.sina 下载,注意抓取数据的频度不要设置的太高,否则你的IP可能会被封掉,代码如下:
      public class SnatchFormSina 
          {
              #region SnatchFormSina
      
              HttpClient client;
      
              private const string dataurl = "http://hq.sinajs.cn/list={0}";
      
              public SnatchFormSina()
              {
                  this.client = new HttpClient();
              }
      
              /// <summary>
              /// 
              /// </summary>
              public static SnatchFormSina Current
              {
                  get {
                      return new SnatchFormSina();
                  }
              }
      
              #endregion
      
              #region GetCurrentInfos
      
              /// <summary>
              /// 
              /// </summary>
              /// <param name="stockIDs"></param>
              /// <returns></returns>
              public async Task<List<StockRealInfo>> GetCurrentInfosAsync(List<string> stockIDs)
              {
                  List<StockRealInfo> list = new List<StockRealInfo>();
                  string dataUrl = this.ParseStockIDs(stockIDs);
                  dataUrl = dataUrl.Substring(0, dataUrl.Length - 1);
      
                  string realInfo = await this.client.GetStringAsync(dataUrl);
                  string[] infos = realInfo.Split('
      ');
      
                  StockRealInfo stockInfo;
                  foreach (string info in infos)
                  {
                      if (string.IsNullOrEmpty(info))
                          continue;
      
                      stockInfo = new StockRealInfo(info);
                      stockInfo.ID = SimulatorCache.StockAccount[stockInfo.Name];
                      SimulatorCache.StockInfos[stockInfo.ID] = stockInfo;
                      list.Add(stockInfo);
                  }
      
                  return list;
              }
      
              #endregion
      
              #region ParseStockIDs
      
              /// <summary>
              /// 
              /// </summary>
              /// <param name="stockIDs"></param>
              /// <returns></returns>
              private string ParseStockIDs(List<string> stockIDs)
              {
                  StringBuilder sb = new StringBuilder();
                  foreach(string id in stockIDs)
                  { 
                      if (id.Substring(0, 2) == "60")//上海是600打头
                      {
                          sb.Append(string.Format("sh{0},", id));
                      }
                      else if (id.Substring(0, 2) == "51")//上海基金
                      {
                          sb.Append(string.Format("sh{0},", id));
                      }
                      else //if (stockIDs.Substring(0, 2) == "00")//深圳
                      {
                          sb.Append(string.Format("sz{0},", id));
                      }
                  }
      
                  sb[sb.Length - 1].ToString().Replace(",", "");
      
                  return string.Format(dataurl, sb.ToString());
              }
      
              #endregion
      
              #region ValiateStockID
      
              /// <summary>
              /// 
              /// </summary>
              /// <param name="stockIDs"></param>
              /// <returns></returns>
              public async Task<string> ValiateStockID(string stockID)
              {
                  string name = string.Empty;
                  string dataUrl = this.ParseStockIDs(new List<string> { stockID });
                  dataUrl = dataUrl.Substring(0, dataUrl.Length - 1);
      
                  string realInfo = await this.client.GetStringAsync(dataUrl);
                  string[] infos = realInfo.Split('
      ');
      
                  StockRealInfo stockInfo;
                  foreach (string info in infos)
                  {
                      if (string.IsNullOrEmpty(info))
                          continue;
      
                      stockInfo = new StockRealInfo(info);
                      name = stockInfo.Name;
                  }
      
                  return name;
              }
      
              #endregion
          }
      View Code

    数据持续化到Hbase代码示例:

    • 代码中Utils.HBaseClient 是在一个工具类里面创建一个HBaseClient实例
      public class StockRealWriter
          {
              #region StockRealWriter
      
              Queue<StockRealInfo> queue = new Queue<StockRealInfo>();
      
              // use multithread write
              Thread writerThread;
              bool threadRunning = true;
      
              const string HBASESTOCKTBLNAME = "StockRealInfo";
      
              public StockRealWriter()
              {
                  // Start a thread for writting to HBase
                  Task task = new Task(WriterThreadFunction);
                  task.Start();
              }
      
              ~StockRealWriter()
              {
                  threadRunning = false;
              }
      
              #endregion
      
              #region WriterThreadFunction
      
              /// <summary>
              /// WriterThreadFunction
              /// </summary>
              public void WriterThreadFunction()
              {
                  while (threadRunning)
                  {
                      if (queue.Count > 0)
                      {
                          lock (queue)
                          {
                              CellSet set = new CellSet();
                              do
                              {
                                  StockRealInfo stock = queue.Dequeue();
                                  this.CreateStockByRealInfos(set, stock);
                              } while (queue.Count > 0);
      
                              Utils.HBaseClient.StoreCellsAsync(HBASESTOCKTBLNAME, set);
                          }
                      }
      
                      Thread.Sleep(5000);
                  }
              }
      
              #endregion
      
              #region CreateStockByRealInfos
      
      
              /// <summary>
              /// 
              /// </summary>
              /// <param name="set"></param>
              /// <param name="info"></param>
              private void CreateStockByRealInfos(CellSet set, StockRealInfo info)
              {
                  string key = string.Format("{0}_{1}_{2}", info.ID, info.Date, info.Time);
                  var row = new CellSet.Row { key = Encoding.UTF8.GetBytes(key) };
                  
                  var value = new Cell { column = Encoding.UTF8.GetBytes("d:ID"), data = Encoding.UTF8.GetBytes(info.ID) };
                  row.values.Add(value);
      
                  value = new Cell { column = Encoding.UTF8.GetBytes("d:Name"), data = Encoding.UTF8.GetBytes(info.Name) };
                  row.values.Add(value);
      
                  //今日开盘价
                  value = new Cell { column = Encoding.UTF8.GetBytes("d:TodayOpen"), data = Encoding.UTF8.GetBytes(info.TodayOpen) };
                  row.values.Add(value);
      
                  //昨日收盘价
                  value = new Cell { column = Encoding.UTF8.GetBytes("d:YesterdayClose"), data = Encoding.UTF8.GetBytes(info.YesterdayClose) };
                  row.values.Add(value);
      
                  //当前价格
                  value = new Cell { column = Encoding.UTF8.GetBytes("d:Current"), data = Encoding.UTF8.GetBytes(info.Current) };
                  row.values.Add(value);
      
                  //今日最高价
                  value = new Cell { column = Encoding.UTF8.GetBytes("d:High"), data = Encoding.UTF8.GetBytes(info.High) };
                  row.values.Add(value);
      
                  //今日最低价
                  value = new Cell { column = Encoding.UTF8.GetBytes("d:Low"), data = Encoding.UTF8.GetBytes(info.Low) };
                  row.values.Add(value);
      
                  //竟买价 买1
                  value = new Cell { column = Encoding.UTF8.GetBytes("d:Buy"), data = Encoding.UTF8.GetBytes(info.Buy) };
                  row.values.Add(value);
      
                  //竟卖价 卖1
                  value = new Cell { column = Encoding.UTF8.GetBytes("d:Sell"), data = Encoding.UTF8.GetBytes(info.Sell) };
                  row.values.Add(value);
      
                  // 成交数 单位股数 通常除于100成为手
                  value = new Cell { column = Encoding.UTF8.GetBytes("d:VolAmount"), data = Encoding.UTF8.GetBytes(info.VolAmount) };
                  row.values.Add(value);
      
                  //  成交多少钱,单位元
                  value = new Cell { column = Encoding.UTF8.GetBytes("d:VolMoney"), data = Encoding.UTF8.GetBytes(info.VolMoney) };
                  row.values.Add(value);
      
                  //  日期
                  value = new Cell { column = Encoding.UTF8.GetBytes("d:Date"), data = Encoding.UTF8.GetBytes(info.Date) };
                  row.values.Add(value);
      
                  //  时间 
                  value = new Cell { column = Encoding.UTF8.GetBytes("d:Time"), data = Encoding.UTF8.GetBytes(info.Time) };
                  row.values.Add(value);
      
                  //  差额
                  value = new Cell { column = Encoding.UTF8.GetBytes("d:Diff"), data = Encoding.UTF8.GetBytes(info.Diff) };
                  row.values.Add(value);
      
                  //  百分比
                  value = new Cell { column = Encoding.UTF8.GetBytes("d:DiffPrec"), data = Encoding.UTF8.GetBytes(info.DiffPrec) };
                  row.values.Add(value);
      
                  DataRow buyInfo;
                  for(int i=0;i<5;i++)
                  {
                      buyInfo = info.BuyList.Rows[i];
                      
                      value = new Cell { column = Encoding.UTF8.GetBytes(string.Format("d:Price0{0}",i.ToString())), data = Encoding.UTF8.GetBytes(Convert.ToString(buyInfo["Price"])) };
                      row.values.Add(value);
      
                      value = new Cell { column = Encoding.UTF8.GetBytes(string.Format("d:Amount0{0}", i.ToString())), data = Encoding.UTF8.GetBytes(Convert.ToString(buyInfo["Amount"])) };
                      row.values.Add(value);
                  }
      
                  DataRow sellInfo;
                  for (int i = 0; i < 5; i++)
                  {
                      sellInfo = info.SellList.Rows[i];
      
                      value = new Cell { column = Encoding.UTF8.GetBytes(string.Format("d:Price1{0}", i.ToString())), data = Encoding.UTF8.GetBytes(Convert.ToString(sellInfo["Price"])) };
                      row.values.Add(value);
      
                      value = new Cell { column = Encoding.UTF8.GetBytes(string.Format("d:Amount1{0}", i.ToString())), data = Encoding.UTF8.GetBytes(Convert.ToString(sellInfo["Amount"])) };
                      row.values.Add(value);
                  }
                  
                  set.rows.Add(row);
              }
      
              #endregion
      
              #region WriteStock
      
              /// <summary>
              /// 
              /// </summary>
              /// <param name="stockInfo"></param>
              public void WriteStock(List<StockRealInfo> stockInfos)
              {
                  lock (queue)
                  {
                      foreach(var stockInfo in stockInfos)
                      { 
                        queue.Enqueue(stockInfo);
                      }
                  }
              }
      
              #endregion
          }
      View Code

    从HBase读取数据代码:

    • 代码中 Scanner 参数是指设置的查询范围 (设置StartRow、EndRow、Batch等参数)
      public class StockRealReader
          {
              #region StockRealReader
      
              const string HBASESTOCKTBLNAME = "StockRealInfo";
      
              public StockRealReader()
              {
      
              }
      
              #endregion
      
              #region QueryStockRealAsync
      
              public async Task<List<StockRealInfo>> QueryStockRealAsync(Scanner query)
              {
                  List<StockRealInfo> list = new List<StockRealInfo>();
                  
                  ScannerInformation info = await Utils.HBaseClient.CreateScannerAsync(HBASESTOCKTBLNAME, query);
      
                  CellSet next;
                  while ((next = await Utils.HBaseClient.ScannerGetNextAsync(info)) != null)
                  {
                      StockRealInfo realInfo;
                      foreach (CellSet.Row row in next.rows)
                      {
                          realInfo = new StockRealInfo();
      
                          //开盘价
                          var temp = row.values.Find(c => Encoding.UTF8.GetString(c.column) == "d:TodayOpen");
                          realInfo.TodayOpen = Encoding.UTF8.GetString(temp.data);
      
                          //昨日收盘价
                          temp = row.values.Find(c => Encoding.UTF8.GetString(c.column) == "d:YesterdayClose");
                          realInfo.YesterdayClose = Encoding.UTF8.GetString(temp.data);
      
                          //当前价格
                          temp = row.values.Find(c => Encoding.UTF8.GetString(c.column) == "d:Current");
                          realInfo.Current = Encoding.UTF8.GetString(temp.data);
      
                          //今日最高价
                          temp = row.values.Find(c => Encoding.UTF8.GetString(c.column) == "d:High");
                          realInfo.High = Encoding.UTF8.GetString(temp.data);
      
                          //今日最低价
                          temp = row.values.Find(c => Encoding.UTF8.GetString(c.column) == "d:Low");
                          realInfo.Low = Encoding.UTF8.GetString(temp.data);
      
                          //竟买价 买1
                          temp = row.values.Find(c => Encoding.UTF8.GetString(c.column) == "d:Buy");
                          realInfo.Buy = Encoding.UTF8.GetString(temp.data);
      
                          //竟卖价 卖1
                          temp = row.values.Find(c => Encoding.UTF8.GetString(c.column) == "d:Sell");
                          realInfo.Sell = Encoding.UTF8.GetString(temp.data);
      
                          //成交数 单位股数 通常除于100成为手
                          temp = row.values.Find(c => Encoding.UTF8.GetString(c.column) == "d:VolAmount");
                          realInfo.VolAmount = Encoding.UTF8.GetString(temp.data);
      
                          //成交多少钱,单位元
                          temp = row.values.Find(c => Encoding.UTF8.GetString(c.column) == "d:VolMoney");
                          realInfo.VolMoney = Encoding.UTF8.GetString(temp.data);
      
                          //日期
                          temp = row.values.Find(c => Encoding.UTF8.GetString(c.column) == "d:Date");
                          realInfo.Date = Encoding.UTF8.GetString(temp.data);
      
                          //时间
                          temp = row.values.Find(c => Encoding.UTF8.GetString(c.column) == "d:Time");
                          realInfo.Time = Encoding.UTF8.GetString(temp.data);
      
                          //差额
                          temp = row.values.Find(c => Encoding.UTF8.GetString(c.column) == "d:Diff");
                          realInfo.Diff = Encoding.UTF8.GetString(temp.data);
      
                          //百分比
                          temp = row.values.Find(c => Encoding.UTF8.GetString(c.column) == "d:DiffPrec");
                          realInfo.DiffPrec = Encoding.UTF8.GetString(temp.data);
      
                          list.Add(realInfo);
                      
                      }
                  }
      
                  return list;
              }
      
              #endregion
          }
      View Code
  • 相关阅读:
    按顺序触发事件LazyMan deepcopy
    requirejs学习,demo下载学习
    滑动删除demo
    jquery1.7.2的源码分析(四)$.Deferred(2)
    jquery1.7.2的源码分析(三)$.Deferred
    jquery1.7.2的源码分析(二)
    jquery1.7.2的源码分析(一)
    解码H264文件的一些基础知识
    和 的区别
    Jmeter Cookie管理器 获取JSESSIONID
  • 原文地址:https://www.cnblogs.com/tgzhu/p/5755353.html
Copyright © 2011-2022 走看看