netty之SpringBoot+Netty+Elasticsearch收集日志信息数据存储

前言


将大量的业务以及用户行为数据存储起来用于分析处理,但是由于数据量较大且需要具备可分析功能所以将数据存储到文件系统更为合理。尤其是一些互联网高并发级应用,往往数据库都采用分库分表设计,那么将这些分散的数据通过binlog汇总到一个统一的文件系统就显得非常有必要。

#开发环境
在这里插入图片描述
环境准备
windows安装包 下载
注意 es是以来java环境 所以需要安装jdk 支持1.7以上
es-hander下载可视化操作插件

@Document(indexName = "stack", type = "group_user")
public class User {@Idprivate String id;private String name;   //姓名private Integer age;   //年龄private String level;  //级别private Date entryDate;//时间private String mobile; //电话private String email;  //邮箱private String address;//地址public User(String id, String name, Integer age, String level, Date entryDate, String mobile, String email, String address) {this.id = id;this.name = name;this.age = age;this.level = level;this.entryDate = entryDate;this.mobile = mobile;this.email = email;this.address = address;}public String getId() {return id;}public void setId(String id) {this.id = id;}public String getName() {return name;}public void setName(String name) {this.name = name;}public Integer getAge() {return age;}public void setAge(Integer age) {this.age = age;}public String getLevel() {return level;}public void setLevel(String level) {this.level = level;}public Date getEntryDate() {return entryDate;}public void setEntryDate(Date entryDate) {this.entryDate = entryDate;}public String getMobile() {return mobile;}public void setMobile(String mobile) {this.mobile = mobile;}public String getEmail() {return email;}public void setEmail(String email) {this.email = email;}public String getAddress() {return address;}public void setAddress(String address) {this.address = address;}
}
@Service("myServerHandler")
public class MyServerHandler extends ChannelInboundHandlerAdapter {private Logger logger = LoggerFactory.getLogger(MyServerHandler.class);@Autowiredprivate UserService userService;/*** 当客户端主动链接服务端的链接后,这个通道就是活跃的了。也就是客户端与服务端建立了通信通道并且可以传输数据*/@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {SocketChannel channel = (SocketChannel) ctx.channel();logger.info("链接报告开始");logger.info("链接报告信息:有一客户端链接到本服务端");logger.info("链接报告IP:{}", channel.localAddress().getHostString());logger.info("链接报告Port:{}", channel.localAddress().getPort());logger.info("链接报告完毕");}/*** 当客户端主动断开服务端的链接后,这个通道就是不活跃的。也就是说客户端与服务端的关闭了通信通道并且不可以传输数据*/@Overridepublic void channelInactive(ChannelHandlerContext ctx) throws Exception {logger.info("客户端断开链接{}", ctx.channel().localAddress().toString());}@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {//接收msg消息{与上一章节相比,此处已经不需要自己进行解码}logger.info(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()) + " 服务端接收到消息:" + JSON.toJSONString(msg));//接收数据写入到ElasticsearchTransportProtocol transportProtocol = (TransportProtocol) msg;userService.save((User) transportProtocol.getObj());}/*** 抓住异常,当发生异常的时候,可以做一些相应的处理,比如打印日志、关闭链接*/@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {ctx.close();logger.info("异常信息:\r\n" + cause.getMessage());}}
@Component("nettyServer")
public class NettyServer {private Logger logger = LoggerFactory.getLogger(NettyServer.class);@Resourceprivate MyChannelInitializer myChannelInitializer;//配置服务端NIO线程组private final EventLoopGroup parentGroup = new NioEventLoopGroup(); //NioEventLoopGroup extends MultithreadEventLoopGroup Math.max(1, SystemPropertyUtil.getInt("io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2));private final EventLoopGroup childGroup = new NioEventLoopGroup();private Channel channel;public ChannelFuture bing(InetSocketAddress address) {ChannelFuture channelFuture = null;try {ServerBootstrap b = new ServerBootstrap();b.group(parentGroup, childGroup).channel(NioServerSocketChannel.class)    //非阻塞模式.option(ChannelOption.SO_BACKLOG, 128).childHandler(myChannelInitializer);channelFuture = b.bind(address).syncUninterruptibly();channel = channelFuture.channel();} catch (Exception e) {logger.error(e.getMessage());} finally {if (null != channelFuture && channelFuture.isSuccess()) {logger.info("itstack-demo-netty server start done. {关注明哥,获取源码}");} else {logger.error("itstack-demo-netty server start error. {关注明哥,获取源码}");}}return channelFuture;}public void destroy() {if (null == channel) return;channel.close();parentGroup.shutdownGracefully();childGroup.shutdownGracefully();}public Channel getChannel() {return channel;}}

public interface UserService {void save(User user);void deleteById(String id);User queryUserById(String id);Iterable<User> queryAll();Page<User> findByName(String name, PageRequest request);}

提供一个可拓展的操作实体表的接口


public interface UserRepository extends ElasticsearchRepository<User, String> {Page<User> findByName(String name, Pageable pageable);}
@Service("userService")
public class UserServiceImpl implements UserService {private UserRepository dataRepository;@Autowiredpublic void setDataRepository(UserRepository dataRepository) {this.dataRepository = dataRepository;}@Overridepublic void save(User user) {dataRepository.save(user);}@Overridepublic void deleteById(String id) {dataRepository.deleteById(id);}@Overridepublic User queryUserById(String id) {Optional<User> optionalUser = dataRepository.findById(id);return optionalUser.get();}@Overridepublic Iterable<User> queryAll() {return dataRepository.findAll();}@Overridepublic Page<User> findByName(String name, PageRequest request) {return dataRepository.findByName(name, request);}}
@RestController
public class NettyController {@Resourceprivate NettyServer nettyServer;@RequestMapping("/localAddress")public String localAddress() {return "nettyServer localAddress " + nettyServer.getChannel().localAddress();}}
@SpringBootApplication
public class Application implements CommandLineRunner {private Logger logger = LoggerFactory.getLogger(Application.class);@Value("${netty.host}")private String host;@Value("${netty.port}")private int port;@Resourceprivate NettyServer nettyServer;public static void main(String[] args) {System.setProperty("es.set.netty.runtime.available.processors", "false");SpringApplication.run(Application.class, args);}@Overridepublic void run(String... args) throws Exception {InetSocketAddress address = new InetSocketAddress(host, port);ChannelFuture channelFuture = nettyServer.bing(address);Runtime.getRuntime().addShutdownHook(new Thread(() -> nettyServer.destroy()));channelFuture.channel().closeFuture().syncUninterruptibly();}}
## 服务端口
server.port = 8080## Netty服务端配置
netty.host = 127.0.0.1
netty.port = 7397## Elasticsearch配置{更换为自己的cluster-name、cluster-nodes}
spring.data.elasticsearch.cluster-name=es-itstack
spring.data.elasticsearch.cluster-nodes=127.0.0.1:9300
spring.data.elasticsearch.repositories.enabled=true

ApiTest.java *Netty客户端,用于向服务端发送数据

public class ApiTest {public static void main(String[] args) {System.out.println("hi 微信公众号:关注明哥");EventLoopGroup workerGroup = new NioEventLoopGroup();try {Bootstrap b = new Bootstrap();b.group(workerGroup);b.channel(NioSocketChannel.class);b.option(ChannelOption.AUTO_READ, true);b.handler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel channel) throws Exception {//对象传输处理channel.pipeline().addLast(new ObjDecoder(TransportProtocol.class));channel.pipeline().addLast(new ObjEncoder(TransportProtocol.class));// 在管道中添加我们自己的接收数据实现方法channel.pipeline().addLast(new ChannelInboundHandlerAdapter() {@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {}});}});ChannelFuture f = b.connect("127.0.0.1", 7397).sync();System.out.println("itstack-demo-netty client start done. {关注明哥,获取源码}");TransportProtocol tp1 = new TransportProtocol(1, new User(UUID.randomUUID().toString(), "李小明", 1, "T0-1", new Date(), "13566668888", "184172133@qq.com", "北京"));TransportProtocol tp2 = new TransportProtocol(1, new User(UUID.randomUUID().toString(), "张大明", 2, "T0-2", new Date(), "13566660001", "huahua@qq.com", "南京"));TransportProtocol tp3 = new TransportProtocol(1, new User(UUID.randomUUID().toString(), "李书鹏", 2, "T1-1", new Date(), "13566660002", "xiaobai@qq.com", "榆树"));TransportProtocol tp4 = new TransportProtocol(1, new User(UUID.randomUUID().toString(), "韩小雪", 2, "T2-1", new Date(), "13566660002", "xiaobai@qq.com", "榆树"));TransportProtocol tp5 = new TransportProtocol(1, new User(UUID.randomUUID().toString(), "董叔飞", 2, "T4-1", new Date(), "13566660002", "xiaobai@qq.com", "河北"));TransportProtocol tp6 = new TransportProtocol(1, new User(UUID.randomUUID().toString(), "候明相", 2, "T5-1", new Date(), "13566660002", "xiaobai@qq.com", "下花园"));TransportProtocol tp7 = new TransportProtocol(1, new User(UUID.randomUUID().toString(), "田明明", 2, "T3-1", new Date(), "13566660002", "xiaobai@qq.com", "东平"));TransportProtocol tp8 = new TransportProtocol(1, new User(UUID.randomUUID().toString(), "王大伟", 2, "T4-1", new Date(), "13566660002", "xiaobai@qq.com", "西湖"));TransportProtocol tp9 = new TransportProtocol(1, new User(UUID.randomUUID().toString(), "李雪明", 2, "T1-1", new Date(), "13566660002", "xiaobai@qq.com", "南昌"));TransportProtocol tp10 = new TransportProtocol(1, new User(UUID.randomUUID().toString(), "朱小飞", 2, "T2-1", new Date(), "13566660002", "xiaobai@qq.com", "吉林"));TransportProtocol tp11 = new TransportProtocol(1, new User(UUID.randomUUID().toString(), "牛大明", 2, "T1-1", new Date(), "13566660002", "xiaobai@qq.com", "长春"));TransportProtocol tp12 = new TransportProtocol(1, new User(UUID.randomUUID().toString(), "关雪儿", 2, "T2-1", new Date(), "13566660002", "xiaobai@qq.com", "深圳"));//向服务端发送信息f.channel().writeAndFlush(tp1);f.channel().writeAndFlush(tp2);f.channel().writeAndFlush(tp3);f.channel().writeAndFlush(tp4);f.channel().writeAndFlush(tp5);f.channel().writeAndFlush(tp6);f.channel().writeAndFlush(tp7);f.channel().writeAndFlush(tp8);f.channel().writeAndFlush(tp9);f.channel().writeAndFlush(tp10);f.channel().writeAndFlush(tp11);f.channel().writeAndFlush(tp12);f.channel().closeFuture().syncUninterruptibly();} catch (InterruptedException e) {e.printStackTrace();} finally {workerGroup.shutdownGracefully();}}}

好了到这里就结束了netty之SpringBoot+Netty+Elasticsearch收集日志信息数据存储的学习,大家一定要跟着动手操作起来。需要的源码的 可si我获取;

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.xdnf.cn/news/1554869.html

如若内容造成侵权/违法违规/事实不符,请联系一条长河网进行投诉反馈,一经查实,立即删除!

相关文章

Go基础学习11-测试工具gomock和monkey的使用

文章目录 基础回顾MockMock是什么安装gomockMock使用1. 创建user.go源文件2. 使用mockgen生成对应的Mock文件3. 使用mockgen命令生成后在对应包mock下可以查看生成的mock文件4. 编写测试代码5. 运行代码并查看输出 GomonkeyGomonkey优势安装使用对函数进行monkey对结构体中方法…

SQL专项练习第二天

在数据处理和分析中&#xff0c;Hive 是一个强大的工具。本文将通过五个 Hive 相关的问题展示其在不同场景下的应用技巧。 先在home文件夹下建一个hivedata文件夹&#xff0c;把我们所需的数据写成txt文件导入到/home/hivedata/文件夹下面。 一、找出连续活跃 3 天及以上的用户…

茄子病虫害数据集。四类:果肉腐烂、蛀虫、健康、黄斑病。4000张图片,已经按照8:2的比例划分好训练集、验证集 txt格式 含类别yaml文件 已经标注好

茄子病虫害数据集。可用于筛选茄子品质、质量&#xff0c;训练采摘机器人视觉算法模型……数据集大部分图片来源于真实果园拍摄的图片&#xff08;生长在果树之上的&#xff09;&#xff0c;图片分辨率高&#xff0c;数据集分为四类&#xff1a;果肉腐烂、蛀虫、健康、黄斑病。…

Pandas数据分析基础

目录标题 Pandas读取和写入数据数据读取读取csv读取excel数据输出 Pandas基础操作索引数据信息统计计算位置计算数据选择 Pandas高级操作复杂查询类型转换数据排序添加修改高级过滤数据迭代高阶函数 Pandas读取和写入数据 Pandas将数据加载到DataFrame后&#xff0c;就可以使用…

算法知识点————贪心

贪心&#xff1a;只考虑局部最优解&#xff0c;不考虑全部最优解。有时候得不到最优解。 DP&#xff1a;考虑全局最优解。DP的特点&#xff1a;无后效性&#xff08;正在求解的时候不关心前面的解是怎么求的&#xff09;&#xff1b; 二者都是在求最优解的&#xff0c;都有最优…

TB6612电机驱动模块(STM32)

目录 一、介绍 二、模块原理 1.原理图 2.电机驱动原理 三、程序设计 main.c文件 Motor.h文件 Motor.c文件 四、实验效果 五、资料获取 项目分享 一、介绍 TB6612FNG 是东芝半导体公司生产的一款直流电机驱动器件&#xff0c;它具有大电流 MOSFET-H 桥结构&#xff…

【每天学个新注解】Day 15 Lombok注解简解(十四)—@UtilityClass、@Helper

UtilityClass 生成工具类的注解 将一个类通过注解变成一个工具类&#xff0c;并没有什么用&#xff0c;本来代码中的工具类数量就极为有限&#xff0c;并不能达到减少重复代码的目的 1、如何使用 加在需要委托将其变为工具类的普通类上。 2、代码示例 例&#xff1a; Uti…

(C语言贪吃蛇)9.贪吃蛇撞墙找死

目录 游戏说明​ 1.撞墙死翘翘的情况 2.如何解决初始化问题 封装函数initSnake(); 注意事项 解决方法 总结 效果演示 游戏说明 玩家通过上下左右按键来控制小蛇的移动&#xff0c;我们之前的内容完成了小蛇每按下一次右键小蛇便向右移动一格&#xff0c;但是玩贪吃蛇一…

vue-live2d看板娘集成方案设计使用教程

文章目录 前言v1.1.x版本&#xff1a;vue集成看板娘&#xff08;暂不使用&#xff0c;在v1.2.x已替换&#xff09;集成看板娘实现看板娘拖拽效果方案资源备份存储 当前最新调研&#xff1a;2024.10.2开源方案1&#xff1a;OhMyLive2D&#xff08;推荐&#xff09;开源方案2&…

Spring Boot中线程池使用

说明&#xff1a;在一些场景&#xff0c;如导入数据&#xff0c;批量插入数据库&#xff0c;使用常规方法&#xff0c;需要等待较长时间&#xff0c;而使用线程池可以提高效率。本文介绍如何在Spring Boot中使用线程池来批量插入数据。 搭建环境 首先&#xff0c;创建一个Spr…

Agent 概念学习

Agent 概念学习 什么是 Agent OpenAI的研究员 Lilian 写过一篇博客:《 LLM Powered Autonomous Agents》&#xff0c;将 Agents 定义为&#xff1a;LLM memory planning skills tool use&#xff0c;即大语言模型、记忆、任务规划、工具使用的集合。 Overview of a LLM-…

多模态—图文匹配

可能最近大家已经发现了chatgpt可以根据自己的描述生成图片&#xff0c;其实这就是一个图文匹配的问题&#xff0c;可以理解为这是一个多模态的问题。 在模型训练时我们需要N个图片和N个文本对进行训练&#xff0c;文本通过text encoder形成文本语义向量&#xff0c;text enco…

930/105每日一题

算法 1 4,2,9,11, 4, 2,4 2,4,9 42 4 24 9 2&#xff08;0&#xff09; 4&#xff08;1&#xff09; 9&#xff08;2&#xff09; 11&#xff08;3&#xff09; 11&#xff08;0&#xff09;11&#xff08;1&#xff09; 9&#xff08;2&#xff09; 11&#xff08;3&#xf…

C++之多态篇(超详细版)

1.多态概念 多态就是多种形态&#xff0c;表示去完成某个行为时&#xff0c;当不同的人去完成时会有不同的形态&#xff0c;举个例子在车站买票&#xff0c;可以分为学生票&#xff0c;普通票&#xff0c;军人票&#xff0c;每种票的价格是不一样的&#xff0c;当你是不同的身…

C语言 | Leetcode C语言题解之第457题环形数组是否存在循环

题目&#xff1a; 题解&#xff1a; int next(int* nums, int numsSize, int cur) {return ((cur nums[cur]) % numsSize numsSize) % numsSize; // 保证返回值在 [0,n) 中 }bool circularArrayLoop(int* nums, int numsSize) {for (int i 0; i < numsSize; i) {if (!n…

C++ | Leetcode C++题解之第456题132模式

题目&#xff1a; 题解&#xff1a; class Solution { public:bool find132pattern(vector<int>& nums) {int n nums.size();vector<int> candidate_i {nums[0]};vector<int> candidate_j {nums[0]};for (int k 1; k < n; k) {auto it_i upper_…

Ubuntu24.04远程开机

近来在几台机器上鼓捣linux桌面&#xff0c;顺便研究一下远程唤醒主机。 本篇介绍Ubuntu系统的远程唤醒&#xff0c;Windows系统的唤醒可搜索相关资料。 依赖 有远程唤醒功能的路由器&#xff08;当前一般都带这个功能&#xff09;有线连接主机&#xff08;无线连接有兴趣朋友…

信息安全工程师(33)访问控制概述

前言 访问控制是信息安全领域中至关重要的一个环节&#xff0c;它提供了一套方法&#xff0c;旨在限制用户对某些信息项或资源的访问权限&#xff0c;从而保护系统和数据的安全。 一、定义与目的 定义&#xff1a;访问控制是给出一套方法&#xff0c;将系统中的所有功能和数据…

【JAVA开源】基于Vue和SpringBoot的宠物咖啡馆平台

本文项目编号 T 064 &#xff0c;文末自助获取源码 \color{red}{T064&#xff0c;文末自助获取源码} T064&#xff0c;文末自助获取源码 目录 一、系统介绍二、演示录屏三、启动教程四、功能截图五、文案资料5.1 选题背景5.2 国内外研究现状5.3 可行性分析 六、核心代码6.1 查…

仿RabbitMQ实现消息队列三种主题的调试及源码

文章目录 开源仓库和项目上线广播交换模式下的测试直接交换模式下的测试主题交换模式下的测试 开源仓库和项目上线 本项目已开源到下面链接下的仓库当中 仿RabbitMQ实现消息队列 广播交换模式下的测试 消费者客户端 在进行不同测试下&#xff0c;消费者客户端只需要改变交换机…