今天就跟大家聊聊有关Canal1.1.4中怎么使用RocketMQ将MySQL同步到Redis,可能很多人都不太了解,为了让大家更加了解,小编给大家总结了以下内容,希望大家根据这篇文章可以有所收获。
Canal结合RocketMQ同步MySQL
略
略
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.client</artifactId>
<version>1.1.4</version>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.0.2</version>
</dependency>
<!-- 根据个人需要依赖 -->
<dependency>
<groupId>javax.persistence</groupId>
<artifactId>persistence-api</artifactId>
</dependency>
SQLType.java
import lombok.AccessLevel;
import lombok.NoArgsConstructor;
/**
* Canal监听SQL类型
*
* @author Yu
* @date 2019/09/08 00:18
**/
@NoArgsConstructor(access = AccessLevel.PRIVATE)
public class SQLType {
/**插入*/
public static final String INSERT = "INSERT";
/**更新*/
public static final String UPDATE = "UPDATE";
/**删除*/
public static final String DELETE = "DELETE";
}
User.java
import lombok.Data;
import javax.persistence.Id;
import java.io.Serializable;
/**
* UserPo对象
*
* @author Yu
* @date 2019/09/08 14:13
**/
@Data
public class User implements Serializable {
private static final long serialVersionUID = -6845801275112259322L;
@Id
private Integer uid;
private String username;
private String password;
private String sex;
}
CanalSynService.java
import com.alibaba.otter.canal.protocol.FlatMessage;
import java.util.Collection;
/**
* Canal同步服务
*
* @author Yu
* @date 2019/09/08 00:00
**/
public interface CanalSynService<T> {
/**
* 处理数据
*
* @param flatMessage CanalMQ数据
*/
void process(FlatMessage flatMessage);
/**
* DDL语句处理
*
* @param flatMessage CanalMQ数据
*/
void ddl(FlatMessage flatMessage);
/**
* 插入
*
* @param list 新增数据
*/
void insert(Collection<T> list);
/**
* 更新
*
* @param list 更新数据
*/
void update(Collection<T> list);
/**
* 删除
*
* @param list 删除数据
*/
void delete(Collection<T> list);
}
AbstractCanalMQ2RedisService.java
import com.alibaba.otter.canal.protocol.FlatMessage;
import com.google.common.collect.Sets;
import com.taco.springcloud.canal.constant.SQLType;
import com.taco.springcloud.core.component.ApplicationContextHolder;
import com.taco.springcloud.core.exception.BizException;
import com.taco.springcloud.core.exception.constants.BaseApiCodeEnum;
import com.taco.springcloud.core.utils.JsonUtil;
import com.taco.springcloud.redis.utils.RedisUtils;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.connection.RedisConnection;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.serializer.RedisSerializer;
import org.springframework.util.ReflectionUtils;
import javax.annotation.Resource;
import javax.persistence.Id;
import java.lang.reflect.Field;
import java.lang.reflect.ParameterizedType;
import java.util.*;
/**
* 抽象CanalMQ通用处理服务
*
* @author Yu
* @date 2019/09/08 00:05
**/
@Slf4j
public abstract class AbstractCanalMQ2RedisService<T> implements CanalSynService<T> {
@Resource
private RedisTemplate<String, Object> redisTemplate;
@Resource
private RedisUtils redisUtils;
private Class<T> cache;
/**
* 获取Model名称
*
* @return Model名称
*/
protected abstract String getModelName();
@Override
public void process(FlatMessage flatMessage) {
if(flatMessage.getIsDdl()) {
ddl(flatMessage);
return;
}
Set<T> data = getData(flatMessage);
if(SQLType.INSERT.equals(flatMessage.getType())) {
insert(data);
}
if(SQLType.UPDATE.equals(flatMessage.getType())) {
update(data);
}
if(SQLType.DELETE.equals(flatMessage.getType())) {
delete(data);
}
}
@Override
public void ddl(FlatMessage flatMessage) {
//TODO : DDL需要同步,删库清空,更新字段处理
}
@Override
public void insert(Collection<T> list) {
insertOrUpdate(list);
}
@Override
public void update(Collection<T> list) {
insertOrUpdate(list);
}
private void insertOrUpdate(Collection<T> list) {
redisTemplate.executePipelined( (RedisConnection redisConnection) -> {
for (T data : list) {
String key = getWrapRedisKey(data);
RedisSerializer keySerializer = redisTemplate.getKeySerializer();
RedisSerializer valueSerializer = redisTemplate.getValueSerializer();
redisConnection.set(keySerializer.serialize(key), valueSerializer.serialize(data));
}
return null;
});
}
@Override
public void delete(Collection<T> list) {
Set<String> keys = Sets.newHashSetWithExpectedSize(list.size());
for (T data : list) {
keys.add(getWrapRedisKey(data));
}
//Set<String> keys = list.stream().map(this::getWrapRedisKey).collect(Collectors.toSet());
redisUtils.delAll(keys);
}
/**
* 封装redis的key
*
* @param t 原对象
* @return key
*/
protected String getWrapRedisKey(T t) {
return new StringBuilder()
.append(ApplicationContextHolder.getApplicationName())
.append(":")
.append(getModelName())
.append(":")
.append(getIdValue(t))
.toString();
}
/**
* 获取类泛型
*
* @return 泛型Class
*/
protected Class<T> getTypeArguement() {
if(cache == null) {
cache = (Class<T>) ((ParameterizedType) this.getClass().getGenericSuperclass()).getActualTypeArguments()[0];
}
return cache;
}
/**
* 获取Object标有@Id注解的字段值
*
* @param t 对象
* @return id值
*/
protected Object getIdValue(T t) {
Field fieldOfId = getIdField();
ReflectionUtils.makeAccessible(fieldOfId);
return ReflectionUtils.getField(fieldOfId, t);
}
/**
* 获取Class标有@Id注解的字段名称
*
* @return id字段名称
*/
protected Field getIdField() {
Class<T> clz = getTypeArguement();
Field[] fields = clz.getDeclaredFields();
for (Field field : fields) {
Id annotation = field.getAnnotation(Id.class);
if (annotation != null) {
return field;
}
}
log.error("PO类未设置@Id注解");
throw new BizException(BaseApiCodeEnum.FAIL);
}
/**
* 转换Canal的FlatMessage中data成泛型对象
*
* @param flatMessage Canal发送MQ信息
* @return 泛型对象集合
*/
protected Set<T> getData(FlatMessage flatMessage) {
List<Map<String, String>> sourceData = flatMessage.getData();
Set<T> targetData = Sets.newHashSetWithExpectedSize(sourceData.size());
for (Map<String, String> map : sourceData) {
T t = JsonUtil.mapConvertPojo(map, getTypeArguement());
targetData.add(t);
}
return targetData;
}
}
TestUsersConsumer.java
import com.alibaba.otter.canal.protocol.FlatMessage;
import com.taco.springcloud.canal.model.User;
import com.taco.springcloud.canal.service.AbstractCanalMQ2RedisService;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Service;
@Slf4j
@Service
@RocketMQMessageListener(topic = "test_users", consumerGroup = "users")
public class TestUsersConsumer extends AbstractCanalMQ2RedisService<User> implements RocketMQListener<FlatMessage> {
@Getter
private String modelName = "user";
@Override
public void onMessage(FlatMessage s) {
process(s);
}
}
看完上述内容,你们对Canal1.1.4中怎么使用RocketMQ将MySQL同步到Redis有进一步的了解吗?如果还想了解更多知识或者相关内容,请关注亿速云行业资讯频道,感谢大家的支持。
亿速云「云数据库 MySQL」免部署即开即用,比自行安装部署数据库高出1倍以上的性能,双节点冗余防止单节点故障,数据自动定期备份随时恢复。点击查看>>
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。
原文链接:https://my.oschina.net/teddyIH/blog/3103573