美文网首页
使用Netty实现汉王考勤机云平台

使用Netty实现汉王考勤机云平台

作者: 删我丶 | 来源:发表于2019-08-25 15:37 被阅读0次

开发场景

由于公司需要使用汉王考勤机,并需要将考勤内容信息存储至数据库,并提供接口给其他系统平台使用。
汉王官方提供的JavaApi为多线程+io流处理,考勤机数量多时会出现数据混乱现象,所以有此想法改为Netty实现。

开发所用技术

SpringBoot2.1.4+netty+mysql5.7+Quartz+Freemarker(代码生成器使用)

开发实现

由于汉王考勤机相当于客户端,所以后台只需要实现服务端即可。一切代码遵循汉王官网文档的协议内容及传输接收方式。

UDP协议

  • UPPServer类
package com.hanwang.config;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioDatagramChannel;
import io.netty.util.concurrent.Future;

import javax.annotation.PreDestroy;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
 
@Component
public class NettyUDPServer {
 
     
        private static final EventLoopGroup group = new NioEventLoopGroup(1);
     
        @Autowired
        UDPServerChannelInitializer serverChannelInitializer;
     
        @Value("${port}")
        private int port;
     
        //监听端口的通道,即server的处理通道
        private Channel channel;
     
        /**
         * 开启udp server服务
         *
         * @return
         * @throws InterruptedException 
         */
        public ChannelFuture start() throws InterruptedException{
            //启动类
            Bootstrap serverBootstrap = new Bootstrap();
            serverBootstrap.group(group)//组配置,初始化ServerBootstrap的线程组
                    .channel(NioDatagramChannel.class)//数据包通道,udp通道类型
                    //支持广播
                    .handler(serverChannelInitializer);//通道处理者
            //Future:异步任务的生命周期,可用来获取任务结果
            ChannelFuture channelFuture1 = serverBootstrap.bind(port).sync();//绑定端口,开启监听,同步等待
            if (channelFuture1 != null && channelFuture1.isSuccess()) {
                System.out.println("[UDP] server start success, port = {}");
                channel = channelFuture1.channel();//获取通道
            } else {
                channelFuture1.cause().printStackTrace();
            }
            return channelFuture1;
        }
     
        /**
         * 停止udp server服务
         * 销毁前的拦截
         */
        @PreDestroy
        public void destroy() {
            try {
                if (channel != null) {
                    ChannelFuture await = channel.close().await();
                    if (!await.isSuccess()) {
                    }
                }
                Future<?> future1 = group.shutdownGracefully().await();
                if (!future1.isSuccess()) {
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
}
  • UPPServer启动类
package com.hanwang.config;

import io.netty.channel.ChannelFuture;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.stereotype.Component;

import com.hanwang.service.KqptCommandService;
import com.hanwang.service.KqptDevicesService;

@Component
public class NettyUDPServerRun implements ApplicationRunner{
    
    @Autowired
    NettyUDPServer nettyUdpServer;
    
    @Value("${port}")
    private int PORT;
    
    @Autowired
    private KqptDevicesService kqptDevicesService;
    @Autowired
    private KqptCommandService commandService;
    
    
    @Override
    public void run(ApplicationArguments args){
        try {
            //启动服务端
            ChannelFuture start = nettyUdpServer.start();
            start.channel();
            //服务端管道关闭的监听器并同步阻塞,直到channel关闭,线程才会往下执行,结束进程
            //start.channel().closeFuture().syncUninterruptibly();
            
        } catch (Exception e) {
            
        }
        
    }

    
}
  • UPDServer通道处理类
package com.hanwang.config;

import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.socket.DatagramPacket;
import io.netty.util.CharsetUtil;

import java.nio.charset.Charset;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import com.hanwang.entity.KqptCommand;
import com.hanwang.entity.KqptDevices;
import com.hanwang.service.KqptCommandService;
import com.hanwang.service.KqptDevicesService;
import com.hanwang.splash.FaceId_Item;
import com.hanwang.utils.DateUtil;
 
/**
 * description: 通道数据输入的处理
 **/
@Component
@ChannelHandler.Sharable
public class UDPServerChannelInboundHandler extends SimpleChannelInboundHandler<DatagramPacket> {
    
    @Autowired
    private KqptDevicesService kqptDevicesService;
    @Autowired
    private KqptCommandService commandService;
 
    @Override
    protected void channelRead0(ChannelHandlerContext channelHandlerContext, DatagramPacket datagramPacket) throws Exception {
        System.out.println("[UDP] server 收到的消息:" + datagramPacket.content().toString(CharsetUtil.UTF_8));
        String message = datagramPacket.content().toString(CharsetUtil.UTF_8);
            //获取sbsn
            String sbsn = FaceId_Item.GetKeyValue(message, "sn");
            //从数据库kqpt_devices中根据sbsn获取设备监控信息
            KqptDevices kqptDevices = kqptDevicesService.selectBySbsn(sbsn);
            if(kqptDevices!=null){
                if(kqptDevices.getStatus()==null || kqptDevices.getStatus()=="" || kqptDevices.getStatus()!="00" || (!kqptDevices.getStatus().equals("00"))){
                    kqptDevices.setStatus("00");
                }
                kqptDevices.setLastTime(DateUtil.getNow());
                kqptDevicesService.update(kqptDevices);
                //是否下发指令
                Map<String, Object> map = new HashMap<String, Object>();
                map.put("sbSn", sbsn);
                map.put("nowTime", DateUtil.getNow());
                List<KqptCommand> list = null;
                list =  commandService.selectWzxByComidOrSbsn(map);
                if(list.size()>0){
                        //向客户端下发指令-告知有未执行的命令
                        DatagramPacket datagramPacket1 = new DatagramPacket(Unpooled.copiedBuffer("PostRequest()", Charset.forName("GBK")), datagramPacket.sender());
                        channelHandlerContext.channel().writeAndFlush(datagramPacket1);
                }
                
            }
    }
 
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        super.channelActive(ctx);
    }
 
}

  • UPDServer通道初始化,用于处理字符串编码
package com.hanwang.config;


 
import java.nio.charset.Charset;

import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.nio.NioDatagramChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.util.CharsetUtil;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
 
/**
 * description: 通道初始化,主要用于设置各种Handler
 **/
@Component
public class UDPServerChannelInitializer extends ChannelInitializer<NioDatagramChannel> {
 
    @Autowired
    UDPServerChannelInboundHandler serverChannelHandler;
 
    @Override
    protected void initChannel(NioDatagramChannel nioDatagramChannel) throws Exception {
        ChannelPipeline pipeline = nioDatagramChannel.pipeline();
        pipeline.addLast(new StringDecoder());
        //自定义的InboundHandler输入处理者
        //pipeline.addLast(new StringEncoder(CharsetUtil.UTF_8));
        //字符串编解码器
        pipeline.addLast(
                new StringDecoder(CharsetUtil.UTF_8),
                new StringEncoder(Charset.forName("GBK"))
                );
        pipeline.addLast("serverChannelHandler", serverChannelHandler);
    }
}

TCP协议

  • TCPServer类
package com.hanwang.config;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.util.concurrent.Future;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

import javax.annotation.PreDestroy;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
 
@Component
public class NettyTCPServer {
 
    //boss事件轮询线程组
    //处理Accept连接事件的线程,这里线程数设置为1即可,netty处理链接事件默认为单线程,过度设置反而浪费cpu资源
    private EventLoopGroup boss = new NioEventLoopGroup(1);
    //worker事件轮询线程组
    //处理hadnler的工作线程,其实也就是处理IO读写 。线程数据默认为 CPU 核心数乘以2
    private EventLoopGroup worker = new NioEventLoopGroup();
 
    @Autowired
    TCPServerChannelInitializer serverChannelInitializer;
 
    @Value("${netty.tcp.server.port}")
    private Integer port;
 
    //与客户端建立连接后得到的通道对象
    private Channel channel;
 
    /**
     * 存储client的channel
     * key:ip,value:Channel
     */
    public static Map<String, Channel> map = new ConcurrentHashMap<String, Channel>();
 
    /**
     * 开启Netty tcp server服务
     *
     * @return
     */
    public ChannelFuture start() {
        //启动类
        ServerBootstrap serverBootstrap = new ServerBootstrap();
        serverBootstrap.group(boss, worker)//组配置,初始化ServerBootstrap的线程组
                .channel(NioServerSocketChannel.class)///构造channel通道工厂//bossGroup的通道,只是负责连接
                .childHandler(serverChannelInitializer)//设置通道处理者ChannelHandler////workerGroup的处理器
                .option(ChannelOption.SO_BACKLOG, 1024)//socket参数,当服务器请求处理程全满时,用于临时存放已完成三次握手请求的队列的最大长度。如果未设置或所设置的值小于1,Java将使用默认值50。
                .childOption(ChannelOption.SO_KEEPALIVE, true);//启用心跳保活机制,tcp,默认2小时发一次心跳
        //Future:异步任务的生命周期,可用来获取任务结果
        ChannelFuture channelFuture1 = serverBootstrap.bind(port).syncUninterruptibly();//绑定端口,开启监听,同步等待
        if (channelFuture1 != null && channelFuture1.isSuccess()) {
            channel = channelFuture1.channel();//获取通道
        } else {
        }
        return channelFuture1;
    }
 
    /**
     * 停止Netty tcp server服务
     */
    @PreDestroy
    public void destroy() {
        if (channel != null) {
            channel.close();
        }
        try {
            Future<?> future = worker.shutdownGracefully().await();
            if (!future.isSuccess()) {
            }
            Future<?> future1 = boss.shutdownGracefully().await();
            if (!future1.isSuccess()) {
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
    
}

  • TCPServer启动类
package com.hanwang.config;

import io.netty.channel.ChannelFuture;

import java.io.IOException;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.stereotype.Component;


@Component
public class NettyTCPServerRun implements ApplicationRunner {

    @Autowired
    NettyTCPServer nettyTcpServer;

    @Override
    public void run(ApplicationArguments args) throws IOException {
        try {
             //启动服务端
            ChannelFuture start = nettyTcpServer.start();
            
            start.channel();
            //服务端管道关闭的监听器并同步阻塞,直到channel关闭,线程才会往下执行,结束进程
            //start.channel().closeFuture().syncUninterruptibly();

        } catch (Exception e) {
            e.printStackTrace();
        }

    }
}
  • TCPServer通道类
package com.hanwang.config;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;

import java.nio.charset.Charset;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import com.hanwang.entity.KqptCommand;
import com.hanwang.entity.KqptDevices;
import com.hanwang.entity.KqptEmployee;
import com.hanwang.entity.KqptKqrecords;
import com.hanwang.service.KqptCommandService;
import com.hanwang.service.KqptDevicesService;
import com.hanwang.service.KqptEmployeeService;
import com.hanwang.service.KqptKqrecordsService;
import com.hanwang.splash.FaceId_Item;
import com.hanwang.utils.DateUtil;
import com.hanwang.utils.StringUtils;
 

@Component
@ChannelHandler.Sharable
public class TCPServerChannelHandler extends SimpleChannelInboundHandler<String> {
    
    // 设备号
    private static String serialNumber = null;
    // 设备号
    private static String GetEmployeeIDserialNumber = null;
    // 当前执行命令ID
    private static Integer commandId = 0;
    // 工号
    private static String empNo = null;
    // 姓名
    private static String name = null;
    
    
    @Autowired
    private KqptKqrecordsService kqptKqrecordsService; 
    @Autowired
    private KqptCommandService kqptCommandService;
    @Autowired
    private KqptEmployeeService kqptEmployeeService;
    @Autowired
    private KqptDevicesService kqptDevicesService;
 
    /**
     * 拿到传过来的msg数据,开始处理
     *
     * @param ctx
     * @param msg
     * @throws Exception
     */
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, String msgStr) throws Exception {
        
        System.out.println("Netty tcp server receive msg : " + msgStr);
        
        
        try{
        
        
        if(msgStr.startsWith("PostRecord"))
        {   
            // 答复已准备好接收考勤记录
            if (true)
            {
                ByteBuf buf = Unpooled.copiedBuffer(new StringBuilder().append("Return(result=\"success\" postphoto=\"false\")"
                        ).toString().toString(), Charset.forName("GBK"));
                ctx.channel().writeAndFlush(buf);
            }
        }
        else if(msgStr.startsWith("Record"))
        {           
            // 服务器回应
            ByteBuf buf = Unpooled.copiedBuffer(new StringBuilder().append("Return(result=\"success\")"
                    ).toString().toString(), Charset.forName("GBK"));
            ctx.channel().writeAndFlush(buf);
        }
        else if(msgStr.startsWith("PostEmployee"))
        {   // 准备上传人员信息

            // 服务器回应
            ByteBuf buf = Unpooled.copiedBuffer(new StringBuilder().append("Return(result=\"success\")"
                    ).toString().toString(), Charset.forName("GBK"));
            ctx.channel().writeAndFlush(buf);
        }                
        else if(msgStr.startsWith("Employee"))
        {   // 读取人员信息

            // 服务器回应
            ByteBuf buf = Unpooled.copiedBuffer(new StringBuilder().append("Return(result=\"success\")"
                    ).toString().toString(), Charset.forName("GBK"));
            ctx.channel().writeAndFlush(buf);
        }
        //获取设备信息-GetDeviceInfo()
        else if(msgStr.startsWith("Return(result=\"success\"  time=")){
            KqptDevices kqptDevices = new KqptDevices();
            kqptDevices.setSbSn(FaceId_Item.GetKeyValue(msgStr, "dev_id"));
            kqptDevices.setMsg(msgStr);
            kqptDevices.setZcNum(Integer.valueOf(FaceId_Item.GetKeyValue(msgStr, "real_faceregist")));
            kqptDevices.setSbNum(Integer.valueOf(FaceId_Item.GetKeyValue(msgStr, "max_faceregist")));
            kqptDevicesService.update(kqptDevices);
        }
        // 下发命令
        else if (msgStr.startsWith("GetRequest"))
        {   
            //获取sbsn
            serialNumber = FaceId_Item.GetKeyValue(msgStr, "sn");
            
            Map<String, Object> map = new HashMap<String, Object>();
            map.put("sbSn", serialNumber);
            map.put("nowTime", DateUtil.getNow());
            List<KqptCommand> list =  kqptCommandService.selectWzxByComidOrSbsn(map);
            if(list.size()>0){
                try {
                    commandId = list.get(0).getId();
                    if(commandId!=null && StringUtils.isNotEmpty(list.get(0).getComNr())){
                            if(list.get(0).getComNr().startsWith("AddNameTable")){
                                
                                empNo = StringUtils.subString(list.get(0).getComNr(), "(", "=");
                                name = StringUtils.subString(list.get(0).getComNr(), "=\"", "\")");
                                
                                ByteBuf buf = Unpooled.copiedBuffer(new StringBuilder().append(list.get(0).getComNr()
                                        ).toString().toString(), Charset.forName("GBK"));
                                ChannelFuture cf =  ctx.channel().writeAndFlush(buf);
                                
                                //添加ChannelFutureListener以便在写操作完成后接收通知
                                cf.addListener(new ChannelFutureListener() {
                                    @Override
                                    public void operationComplete(ChannelFuture future) throws Exception {
                                        //写操作完成,并没有错误发生
                                        if(future.isSuccess()){
                                            //删除对应用户信息
                                            kqptEmployeeService.delBySbsn(serialNumber,empNo);
                                            //处理从考勤机获取到的AddNameTable
                                            KqptEmployee emp =  new KqptEmployee();
                                            emp.setEmpNo(empNo);
                                            emp.setSbSn(serialNumber);
                                            emp.setName(name);
                                            emp.setInTime(DateUtil.getNow());
                                            kqptEmployeeService.save(emp);
                                            //命令执行成功
                                            if(commandId!=null && commandId!=0 && (!commandId.equals("0"))){
                                                KqptCommand kqptCommand = new KqptCommand();
                                                kqptCommand.setId(commandId);
                                                kqptCommand.setStatus("2");
                                                kqptCommand.setResult("success");
                                                kqptCommand.setLastTime(DateUtil.getNow());
                                                kqptCommandService.update(kqptCommand);
                                                commandId = null;
                                            }
                                            ByteBuf buf = Unpooled.copiedBuffer(new StringBuilder().append("GetDeviceInfo()"
                                                    ).toString().toString(), Charset.forName("GBK"));
                                            ChannelFuture cf =  ctx.channel().writeAndFlush(buf);
                                        }else{
                                            //命令执行失败
                                            if(commandId!=null && commandId!=0 && (!commandId.equals("0"))){
                                                KqptCommand kqptCommand = new KqptCommand();
                                                kqptCommand.setId(commandId);
                                                kqptCommand.setStatus("3");
                                                kqptCommand.setResult("fail");
                                                kqptCommand.setLastTime(DateUtil.getNow());
                                                kqptCommandService.update(kqptCommand);
                                                commandId = null;
                                            }
                                        }
                                    }
                                });
                            }else if(list.get(0).getComNr().startsWith("DeleteEmployee")){
                            
                                empNo = FaceId_Item.GetKeyValue(list.get(0).getComNr(), "id");
                                
/*                              //获取链接实例
                                Channel channel = ctx.channel();
                                //创建一个持有数据的ByteBuf
                                //获取设备信息  GetDeviceInfo() 并存入数据库
                                ByteBuf buf = Unpooled.copiedBuffer(new StringBuilder().append(list.get(0).getComNr()
                                        ).toString().toString(), Charset.forName("GBK"));*/
                                //数据冲刷
                                ByteBuf buf = Unpooled.copiedBuffer(new StringBuilder().append(list.get(0).getComNr()
                                        ).toString().toString(), Charset.forName("GBK"));
                                ChannelFuture cf =  ctx.channel().writeAndFlush(buf);
                                
                                //添加ChannelFutureListener以便在写操作完成后接收通知
                                cf.addListener(new ChannelFutureListener() {
                                    @Override
                                    public void operationComplete(ChannelFuture future) throws Exception {
                                        //写操作完成,并没有错误发生
                                        if(future.isSuccess()){
                                            
                                            kqptEmployeeService.delBySbsn(serialNumber,empNo);
                                            //命令执行成功
                                            if(commandId!=null && commandId!=0 && (!commandId.equals("0"))){
                                                KqptCommand kqptCommand = new KqptCommand();
                                                kqptCommand.setId(commandId);
                                                kqptCommand.setStatus("2");
                                                kqptCommand.setResult("success");
                                                kqptCommand.setLastTime(DateUtil.getNow());
                                                kqptCommandService.update(kqptCommand);
                                                commandId = null;
                                            }
                                            ByteBuf buf = Unpooled.copiedBuffer(new StringBuilder().append("GetDeviceInfo()"
                                                    ).toString().toString(), Charset.forName("GBK"));
                                            ChannelFuture cf =  ctx.channel().writeAndFlush(buf);
                                        }else{
                                            //命令执行失败
                                            if(commandId!=null && commandId!=0 && (!commandId.equals("0"))){
                                                KqptCommand kqptCommand = new KqptCommand();
                                                kqptCommand.setId(commandId);
                                                kqptCommand.setStatus("3");
                                                kqptCommand.setResult("fail");
                                                kqptCommand.setLastTime(DateUtil.getNow());
                                                kqptCommandService.update(kqptCommand);
                                                commandId = null;
                                            }
                                        }
                                    }
                                });
                            }else if(list.get(0).getComNr().startsWith("GetEmployeeID")){
                                GetEmployeeIDserialNumber = serialNumber;
                                ByteBuf buf = Unpooled.copiedBuffer(new StringBuilder().append(list.get(0).getComNr()
                                        ).toString().toString(), Charset.forName("GBK"));
                                ChannelFuture cf =  ctx.channel().writeAndFlush(buf);
                              //添加ChannelFutureListener以便在写操作完成后接收通知
                                cf.addListener(new ChannelFutureListener() {
                                    @Override
                                    public void operationComplete(ChannelFuture future) throws Exception {
                                        //写操作完成,并没有错误发生
                                        if(future.isSuccess()){
                                            //命令执行成功
                                            if(commandId!=null && commandId!=0 && (!commandId.equals("0"))){
                                                KqptCommand kqptCommand = new KqptCommand();
                                                kqptCommand.setId(commandId);
                                                kqptCommand.setStatus("2");
                                                kqptCommand.setResult("success");
                                                kqptCommand.setLastTime(DateUtil.getNow());
                                                kqptCommandService.update(kqptCommand);
                                                commandId = null;
                                            }
                                        }else{
                                            //命令执行失败
                                            if(commandId!=null && commandId!=0 && (!commandId.equals("0"))){
                                                KqptCommand kqptCommand = new KqptCommand();
                                                kqptCommand.setId(commandId);
                                                kqptCommand.setStatus("3");
                                                kqptCommand.setResult("fail");
                                                kqptCommand.setLastTime(DateUtil.getNow());
                                                kqptCommandService.update(kqptCommand);
                                                commandId = null;
                                            }
                                        }
                                    }
                                });
                            }else{
                            
                                ByteBuf buf = Unpooled.copiedBuffer(new StringBuilder().append(list.get(0).getComNr()
                                        ).toString().toString(), Charset.forName("GBK"));
                                ChannelFuture cf =  ctx.channel().writeAndFlush(buf);
                                //添加ChannelFutureListener以便在写操作完成后接收通知
                                cf.addListener(new ChannelFutureListener() {
                                    @Override
                                    public void operationComplete(ChannelFuture future) throws Exception {
                                        //写操作完成,并没有错误发生
                                        if(future.isSuccess()){
                                            //命令执行成功
                                            if(commandId!=null && commandId!=0 && (!commandId.equals("0"))){
                                                KqptCommand kqptCommand = new KqptCommand();
                                                kqptCommand.setId(commandId);
                                                kqptCommand.setStatus("2");
                                                kqptCommand.setResult("success");
                                                kqptCommand.setLastTime(DateUtil.getNow());
                                                kqptCommandService.update(kqptCommand);
                                                commandId = null;
                                            }
                                        }else{
                                            //命令执行失败
                                            if(commandId!=null && commandId!=0 && (!commandId.equals("0"))){
                                                KqptCommand kqptCommand = new KqptCommand();
                                                kqptCommand.setId(commandId);
                                                kqptCommand.setStatus("3");
                                                kqptCommand.setResult("fail");
                                                kqptCommand.setLastTime(DateUtil.getNow());
                                                kqptCommandService.update(kqptCommand);
                                                commandId = null;
                                            }
                                        }
                                    }
                                });
                            }
                    }else{
                        ByteBuf buf = Unpooled.copiedBuffer(new StringBuilder().append("Quit()"
                                ).toString().toString(), Charset.forName("GBK"));
                        //数据冲刷
                        ctx.channel().writeAndFlush(buf);
                    }
                }catch (Exception e) {
                }
            }else{
                ByteBuf buf = Unpooled.copiedBuffer(new StringBuilder().append("Quit()"
                        ).toString().toString(), Charset.forName("GBK"));
                //数据冲刷
                ctx.channel().writeAndFlush(buf);
            }
        }
        //保存考勤记录到数据库
        else if(msgStr.startsWith("Return(result=\"success\" dev_id=\"")){
            //暂存时间用于数据上传时间
            String nowTime = DateUtil.getNow();
            if(Integer.valueOf(FaceId_Item.GetKeyValue(msgStr, "total"))>0){
                // 提取单条考勤记录
                List<KqptKqrecords> kqptKqrecordsList = new LinkedList<KqptKqrecords>();
                Pattern p = Pattern.compile("\\b(time=.+\\R(?:photo=\"[^\"]+\")*)");                
                Matcher m = p.matcher(msgStr);
                String devId = FaceId_Item.GetKeyValue(msgStr, "dev_id");   
                while(m.find())
                {                  
                    KqptKqrecords kqptKqrecords = new KqptKqrecords();
                    kqptKqrecords.setSbSn(devId);
                    kqptKqrecords.setEmpNo(FaceId_Item.GetKeyValue(m.group(1), "id"));
                    kqptKqrecords.setKqTime(FaceId_Item.GetKeyValue(m.group(1), "time"));
                    kqptKqrecords.setInTime(nowTime);
                    kqptKqrecords.setTbFlag("0");
                    kqptKqrecordsList.add(kqptKqrecords);
                }  
                kqptKqrecordsService.batchSave(kqptKqrecordsList);
                System.out.println("保存考勤信息成功");
            }
            ByteBuf buf = Unpooled.copiedBuffer(new StringBuilder().append("Quit()"
                    ).toString().toString(), Charset.forName("GBK"));
            //数据冲刷
            ctx.channel().writeAndFlush(buf);
        }
        //获取设备发起GetEmployeeID()指令
        else if(msgStr.startsWith("Return(result=\"success\" total=\"")){
            if(Integer.valueOf(FaceId_Item.GetKeyValue(msgStr, "total"))>0){
                FaceId_Item[] ItemCollection = FaceId_Item.GetAllItems(msgStr);
                if (ItemCollection != null) 
                {  
                    for (FaceId_Item item : ItemCollection) 
                    { 
                        if (item.name.equals("id"))
                        {   
                            KqptCommand command = new KqptCommand();
                            command.setSbSn(GetEmployeeIDserialNumber);
                            command.setComNr(new StringBuilder().append("GetEmployee(id=\"").append(item.value).append("\")").toString());
                            command.setStatus("0");
                            command.setCreateTime(DateUtil.getNow());
                            kqptCommandService.save(command);
                        }  
                    }
                }
            }else{
            
                ByteBuf buf = Unpooled.copiedBuffer(new StringBuilder().append("Quit()"
                        ).toString().toString(), Charset.forName("GBK"));
                //数据冲刷
                ctx.channel().writeAndFlush(buf);
            }
        }
        else if(msgStr.startsWith("Return(result=\"success\" id=\"")){
            kqptEmployeeService.delBySbsn(serialNumber,FaceId_Item.GetKeyValue(msgStr, "id"));
            //处理从考勤机获取到的人员信息
            KqptEmployee employee =  new KqptEmployee();
            employee.setEmpNo(FaceId_Item.GetKeyValue(msgStr, "id"));
            employee.setSbSn(FaceId_Item.GetKeyValue(msgStr, "sn"));
            employee.setName(FaceId_Item.GetKeyValue(msgStr, "name"));
            employee.setAlgEdition(FaceId_Item.GetKeyValue(msgStr, "alg_edition"));
            employee.setCheckType(FaceId_Item.GetKeyValue(msgStr, "check_type"));
            employee.setFaceData(StringUtils.subString(msgStr, "face_data"));
            employee.setInTime(DateUtil.getNow());
            kqptEmployeeService.save(employee);
            System.out.println("人员信息保存成功"+FaceId_Item.GetKeyValue(msgStr, "name"));
            
            ByteBuf buf = Unpooled.copiedBuffer(new StringBuilder().append("Quit()"
                    ).toString().toString(), Charset.forName("GBK"));
            //数据冲刷
            ctx.channel().writeAndFlush(buf);
        }
        else if(msgStr.startsWith("Return(result=\"failed\"")){   
            //命令执行失败
            if(commandId!=null && commandId!=0 && (!commandId.equals("0"))){
                KqptCommand kqptCommand = new KqptCommand();
                kqptCommand.setId(commandId);
                kqptCommand.setStatus("3");
                kqptCommand.setResult("fail");
                kqptCommand.setLastTime(DateUtil.getNow());
                kqptCommandService.update(kqptCommand);
                commandId = null;
            }
            ByteBuf buf = Unpooled.copiedBuffer(new StringBuilder().append("Quit()"
                    ).toString().toString(), Charset.forName("GBK"));
            //数据冲刷
            ctx.channel().writeAndFlush(buf);
        }
        else if(msgStr.startsWith("Quit")){   // 结束会话
            
        }
    }
    catch (Exception ex)
    {
        //命令执行失败
        if(commandId!=null && commandId!=0 && (!commandId.equals("0"))){
            KqptCommand kqptCommand = new KqptCommand();
            kqptCommand.setId(commandId);
            kqptCommand.setStatus("3");
            kqptCommand.setResult("fail");
            kqptCommand.setLastTime(DateUtil.getNow());
            commandId = null;
            kqptCommandService.update(kqptCommand);
        }
    }
    }
 
    /**
     * 活跃的、有效的通道
     * 第一次连接成功后进入的方法
     *
     * @param ctx
     * @throws Exception
     */
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        super.channelActive(ctx);
        System.out.println("tcp client " + getRemoteAddress(ctx) + " connect success");
        //往channel map中添加channel信息
        NettyTCPServer.map.put(getIPString(ctx), ctx.channel());
    }
 
    /**
     * 不活动的通道
     * 连接丢失后执行的方法(client端可据此实现断线重连)
     *
     * @param ctx
     * @throws Exception
     */
    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        //删除Channel Map中的失效Client
        NettyTCPServer.map.remove(getIPString(ctx));
        ctx.close();
    }
 
    /**
     * 异常处理
     *
     * @param ctx
     * @param cause
     * @throws Exception
     */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        super.exceptionCaught(ctx, cause);
        //发生异常,关闭连接
        System.out.println("引擎 {} 的通道发生异常,即将断开连接");
        ctx.close();//再次建议close
    }
 
    /**
     * 心跳机制,超时处理
     *
     * @param ctx
     * @param evt
     * @throws Exception
     */
    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        String socketString = ctx.channel().remoteAddress().toString();
        if (evt instanceof IdleStateEvent) {
            IdleStateEvent event = (IdleStateEvent) evt;
            if (event.state() == IdleState.READER_IDLE) {
                System.out.println("Client: " + socketString + " READER_IDLE 读超时");
                ctx.disconnect();//断开
            } else if (event.state() == IdleState.WRITER_IDLE) {
                System.out.println("Client: " + socketString + " WRITER_IDLE 写超时");
                ctx.disconnect();
            } else if (event.state() == IdleState.ALL_IDLE) {
                System.out.println("Client: " + socketString + " ALL_IDLE 总超时");
                ctx.disconnect();
            }
        }
    }
 
    /**
     * 获取client对象:ip+port
     *
     * @param ctx
     * @return
     */
    public String getRemoteAddress(ChannelHandlerContext ctx) {
        String socketString = "";
        socketString = ctx.channel().remoteAddress().toString();
        return socketString;
    }
 
    /**
     * 获取client的ip
     *
     * @param ctx
     * @return
     */
    public String getIPString(ChannelHandlerContext ctx) {
        String ipString = "";
        String socketString = ctx.channel().remoteAddress().toString();
        int colonAt = socketString.indexOf(":");
        ipString = socketString.substring(1, colonAt);
        return ipString;
    }
 
}
  • TCPServer通道初始化类
package com.hanwang.config;

import java.nio.charset.Charset;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.DelimiterBasedFrameDecoder;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
 
/**
 * description: 通道初始化,主要用于设置各种Handler
 **/
@Component
public class TCPServerChannelInitializer extends ChannelInitializer<SocketChannel> {
 
    @Autowired
    TCPServerChannelHandler serverChannelHandler;
 
    @Override
    protected void initChannel(SocketChannel socketChannel) throws Exception {
        ChannelPipeline pipeline = socketChannel.pipeline();
        ByteBuf delimiter = Unpooled.copiedBuffer(")".getBytes());
        pipeline.addLast(new DelimiterBasedFrameDecoder(Integer.MAX_VALUE,false,delimiter));
        pipeline.addLast(
                new StringDecoder(Charset.forName("GBK")),
                new StringEncoder(Charset.forName("GBK"))
                );
        //字符串编解码器
        //自定义Handler
        pipeline.addLast("serverChannelHandler", serverChannelHandler);
    }
}

  • application.properties
  #禁用jmx
spring.jmx.enabled=false
#DB
spring.datasource.driverClassName=com.mysql.cj.jdbc.Driver
spring.datasource.url=
spring.datasource.username=
spring.datasource.password=
spring.datasource.type=com.zaxxer.hikari.HikariDataSource
spring.datasource.hikari.minimum-idle=15
spring.datasource.hikari.maximum-pool-size=100
spring.datasource.hikari.auto-commit=true
spring.datasource.hikari.idle-timeout=30000
spring.datasource.hikari.pool-name=DatebookHikariCP
spring.datasource.hikari.max-lifetime=18000000
spring.datasource.hikari.connection-timeout=30000
spring.datasource.hikari.connection-test-query=SELECT 1

#mapper
mybatis.mapper-locations: classpath:mapper/*.xml
#sql_log
logging.level.com.hanwang.mapper=debug

# UDP服务端端口
port=9924
# TCP服务端端口
netty.tcp.server.port=9922

写在最后

目前实现功能有,同步考勤,人脸信息,定时拉取考勤信息,定时拉取设备人员信息等。如有相同需求,欢迎一起探讨。项目目前停留版本时间为2019.04版本。

相关文章

网友评论

      本文标题:使用Netty实现汉王考勤机云平台

      本文链接:https://www.haomeiwen.com/subject/hnnhectx.html