IoT之TCP处理(二)

2017-10-10 12:36:45 物联网 3875 0

运行前需要的包

import (
  "net"
  "fmt"
  "time"
  "github.com/garyburd/redigo/redis"
  "./ssc" // 这个是以ssc为命名空间的各种自定义的包目录,包括逻辑、配置、协议、日志、工具文件
)

监听端口

__port := `1200`
l, err := net.Listen("tcp", "0.0.0.0:" + __port)
// 结束当前函数时,关闭监听
defer l.Close()

接受请求

for {
  c, err := l.Accept() // 接收到新的连接请求前,程序会阻塞在此行代码
  if err!= nil {
    continue
  }
  // 协程【可以当成异步】,处理tcp请求
  go HandleConn(c)
}

单个连接的相关处理

/**
* 处理tcp请求
* @param  net.Conn  conn  tcp连接的句柄
*/
func HandleConn(conn net.Conn) {
  redis_conn := ssc.ConnectRedis()  // 初始化一个 redis 连接

  // 监听当前连接,发送过来的数据
  r_flag:= make(chan int)     // 不为空时 => 结束当前redis
  flag  := make(chan int)     // 不为空时 => 有可返回数据
  msg   := make(chan []byte, 10)  // 待直接回复用户的数据 ,缓存 10 条消息
  data  := make([]byte, 2048) // 设置数据缓冲区 2KB

  // 查询是否有处理完的数据
  conn.Read(data) // 第一次的包 => 心跳包 ,获取设备号,Redis读自己设备的队列用
  go DataInRedis(redis_conn, msg, flag, r_flag , data )
  // 回复数据
  go DataReply(conn, msg, flag, r_flag, redis_conn)

  // 接收数据
  for {
    n, err := conn.Read(data)  
    ssc.Log(`HandleConn;收到数据了,是不是暂停了呢 n:`+  strconv.Itoa(n), 1)  

    if err != nil {
      ssc.Log(`客户端已断开连接` , 1)  
      return 
    } else if  n > 0  {
      go DataHandle(data, msg, flag)  // 处理数据 => 存储请求,后期格式化  
    } else  {
      ssc.Log(`数据长度不对` , 1)  
    }


  }

}

查询设备控制消息

/**
* 查询是否有处理完的数据
* @param  redis.Conn  redis_conn  待处理的数据
* @param  []byte      msg         channel数据,临时存储数据
* @param  int         flag        channel数据,处理临时数据的标志
* @param  int         r_flag      channel数据,结束当前redis的标志
* @param  []byte      data        待处理的心跳数据,用来获取设备号
*/
func DataInRedis(c redis.Conn, msg chan []byte, flag chan int, r_flag chan int, data []byte){
  pro, err := ssc.ProtocolUnpack( data )
  // 清空 断线期间 的数据
  c.Do("del", "device:list:" + pro.Code)

  go DataHandle(data, msg, flag)  // 同步设备数据

  if err != `` {
    ssc.Log(err, 5)
    return
  }else{

    for{
      v, _ := redis.String(  c.Do("rpop", "device:list:" + pro.Code)  )  // 读取出来自服务器端封装的消息
      ssc.Log(`Redis中的数据`, 1)
      ssc.Log(v, 1)
      if len(v) > 0 {
        // 协程,处理数据
        go DataHandle( ssc.HexToByte(v), msg, flag )
      }
      select{
        // Program interrupted ?
        case <- r_flag:
          close(r_flag)
          return
        // 阻塞1秒钟 If r_flag
        case <- time.After(time.Second * 1):
          break
      }
    }

  }
}

发送消息给设备

将确切的控制对应设备的消息 直接发给对应设备
而且,若十秒内没收到对应消息,则视为连接断开
必须让对应连接的所有资源【redis连接、mysql连接、goroutine】都关掉
特别是注意带有for相关的地方
否则容易导致资源浪费

Q:最容易看到自己的程序是否存在浪费的现象呢?

A:Linux下使用top命令查看对应进程CPU占用率,单核1G情况下
a.若处理过几条连接。使用率超过1%[编译后不超过0.5%],可以检查下代码中的for相关地方,是否有处理上的问题
b.批量模拟连接Client,并连接请求过一次后关闭,查看CPU占用率,查看各种使用资源的地方,是否在连接结束后都关闭了

/**
* 回复数据
* @param  net.Conn  conn    tcp连接的句柄  
* @param  []byte    msg     channel数据,临时存储数据
* @param  int       flag    channel数据,处理临时数据的标志
* @param  int       r_flag  channel数据,结束当前redis的标志
* @param  redis.Conn  redis_conn  待处理的数据
*/
func DataReply(conn net.Conn,  msg chan []byte, flag chan int, r_flag chan int, redis_conn redis.Conn) {
  for{
    select {
      // 如果收到数据
      case <- flag:
        data := <- msg // 处理收到的数据
        conn.Write(  data  ) // 处理结果
        ssc.Log(`DataReply is ok`, 2)
        conn.SetDeadline(  time.Now().Add(time.Duration( 5 ) * time.Second)  )  // 设置单次超时秒数
        break
      // 心跳:十秒没就断开
      case <- time.After(time.Second * 18):
        r_flag <- 1 // 提醒程序中断redis循环
        redis_conn.Close()
        conn.Close()
        close(msg)
        close(flag)
        ssc.Log(`One Device has been disconneted!`, 2)
        return 
    }
  }
}

连通测试工具

telnetncnetserver

下篇内容

Golang实现相关协议包的封装与解包

注:若无特殊说明,文章均为云天河原创,请尊重作者劳动成果,转载前请一定要注明出处