今天就跟大家聊聊有关Canal1.1.4中怎么使用RocketMQ将MySQL同步到Redis,可能很多人都不太了解,为了让大家更加了解,小编给大家总结了以下内容,希望大家根据这篇文章可以有所收获。
Canal结合RocketMQ同步MySQL
略
略
<dependency> <groupId>com.alibaba.otter</groupId> <artifactId>canal.client</artifactId> <version>1.1.4</version> </dependency> <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-spring-boot-starter</artifactId> <version>2.0.2</version> </dependency> <!-- 根据个人需要依赖 --> <dependency> <groupId>javax.persistence</groupId> <artifactId>persistence-api</artifactId> </dependency>
SQLType.java
import lombok.AccessLevel; import lombok.NoArgsConstructor; /** * Canal监听SQL类型 * * @author Yu * @date 2019/09/08 00:18 **/ @NoArgsConstructor(access = AccessLevel.PRIVATE) public class SQLType { /**插入*/ public static final String INSERT = "INSERT"; /**更新*/ public static final String UPDATE = "UPDATE"; /**删除*/ public static final String DELETE = "DELETE"; }
User.java
import lombok.Data; import javax.persistence.Id; import java.io.Serializable; /** * UserPo对象 * * @author Yu * @date 2019/09/08 14:13 **/ @Data public class User implements Serializable { private static final long serialVersionUID = -6845801275112259322L; @Id private Integer uid; private String username; private String password; private String sex; }
CanalSynService.java
import com.alibaba.otter.canal.protocol.FlatMessage; import java.util.Collection; /** * Canal同步服务 * * @author Yu * @date 2019/09/08 00:00 **/ public interface CanalSynService<T> { /** * 处理数据 * * @param flatMessage CanalMQ数据 */ void process(FlatMessage flatMessage); /** * DDL语句处理 * * @param flatMessage CanalMQ数据 */ void ddl(FlatMessage flatMessage); /** * 插入 * * @param list 新增数据 */ void insert(Collection<T> list); /** * 更新 * * @param list 更新数据 */ void update(Collection<T> list); /** * 删除 * * @param list 删除数据 */ void delete(Collection<T> list); }
AbstractCanalMQ2RedisService.java
import com.alibaba.otter.canal.protocol.FlatMessage; import com.google.common.collect.Sets; import com.taco.springcloud.canal.constant.SQLType; import com.taco.springcloud.core.component.ApplicationContextHolder; import com.taco.springcloud.core.exception.BizException; import com.taco.springcloud.core.exception.constants.BaseApiCodeEnum; import com.taco.springcloud.core.utils.JsonUtil; import com.taco.springcloud.redis.utils.RedisUtils; import lombok.extern.slf4j.Slf4j; import org.springframework.data.redis.connection.RedisConnection; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.data.redis.serializer.RedisSerializer; import org.springframework.util.ReflectionUtils; import javax.annotation.Resource; import javax.persistence.Id; import java.lang.reflect.Field; import java.lang.reflect.ParameterizedType; import java.util.*; /** * 抽象CanalMQ通用处理服务 * * @author Yu * @date 2019/09/08 00:05 **/ @Slf4j public abstract class AbstractCanalMQ2RedisService<T> implements CanalSynService<T> { @Resource private RedisTemplate<String, Object> redisTemplate; @Resource private RedisUtils redisUtils; private Class<T> cache; /** * 获取Model名称 * * @return Model名称 */ protected abstract String getModelName(); @Override public void process(FlatMessage flatMessage) { if(flatMessage.getIsDdl()) { ddl(flatMessage); return; } Set<T> data = getData(flatMessage); if(SQLType.INSERT.equals(flatMessage.getType())) { insert(data); } if(SQLType.UPDATE.equals(flatMessage.getType())) { update(data); } if(SQLType.DELETE.equals(flatMessage.getType())) { delete(data); } } @Override public void ddl(FlatMessage flatMessage) { //TODO : DDL需要同步,删库清空,更新字段处理 } @Override public void insert(Collection<T> list) { insertOrUpdate(list); } @Override public void update(Collection<T> list) { insertOrUpdate(list); } private void insertOrUpdate(Collection<T> list) { redisTemplate.executePipelined( (RedisConnection redisConnection) -> { for (T data : list) { String key = getWrapRedisKey(data); RedisSerializer keySerializer = redisTemplate.getKeySerializer(); RedisSerializer valueSerializer = redisTemplate.getValueSerializer(); redisConnection.set(keySerializer.serialize(key), valueSerializer.serialize(data)); } return null; }); } @Override public void delete(Collection<T> list) { Set<String> keys = Sets.newHashSetWithExpectedSize(list.size()); for (T data : list) { keys.add(getWrapRedisKey(data)); } //Set<String> keys = list.stream().map(this::getWrapRedisKey).collect(Collectors.toSet()); redisUtils.delAll(keys); } /** * 封装redis的key * * @param t 原对象 * @return key */ protected String getWrapRedisKey(T t) { return new StringBuilder() .append(ApplicationContextHolder.getApplicationName()) .append(":") .append(getModelName()) .append(":") .append(getIdValue(t)) .toString(); } /** * 获取类泛型 * * @return 泛型Class */ protected Class<T> getTypeArguement() { if(cache == null) { cache = (Class<T>) ((ParameterizedType) this.getClass().getGenericSuperclass()).getActualTypeArguments()[0]; } return cache; } /** * 获取Object标有@Id注解的字段值 * * @param t 对象 * @return id值 */ protected Object getIdValue(T t) { Field fieldOfId = getIdField(); ReflectionUtils.makeAccessible(fieldOfId); return ReflectionUtils.getField(fieldOfId, t); } /** * 获取Class标有@Id注解的字段名称 * * @return id字段名称 */ protected Field getIdField() { Class<T> clz = getTypeArguement(); Field[] fields = clz.getDeclaredFields(); for (Field field : fields) { Id annotation = field.getAnnotation(Id.class); if (annotation != null) { return field; } } log.error("PO类未设置@Id注解"); throw new BizException(BaseApiCodeEnum.FAIL); } /** * 转换Canal的FlatMessage中data成泛型对象 * * @param flatMessage Canal发送MQ信息 * @return 泛型对象集合 */ protected Set<T> getData(FlatMessage flatMessage) { List<Map<String, String>> sourceData = flatMessage.getData(); Set<T> targetData = Sets.newHashSetWithExpectedSize(sourceData.size()); for (Map<String, String> map : sourceData) { T t = JsonUtil.mapConvertPojo(map, getTypeArguement()); targetData.add(t); } return targetData; } }
TestUsersConsumer.java
import com.alibaba.otter.canal.protocol.FlatMessage; import com.taco.springcloud.canal.model.User; import com.taco.springcloud.canal.service.AbstractCanalMQ2RedisService; import lombok.Getter; import lombok.extern.slf4j.Slf4j; import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; import org.apache.rocketmq.spring.core.RocketMQListener; import org.springframework.stereotype.Service; @Slf4j @Service @RocketMQMessageListener(topic = "test_users", consumerGroup = "users") public class TestUsersConsumer extends AbstractCanalMQ2RedisService<User> implements RocketMQListener<FlatMessage> { @Getter private String modelName = "user"; @Override public void onMessage(FlatMessage s) { process(s); } }
看完上述内容,你们对Canal1.1.4中怎么使用RocketMQ将MySQL同步到Redis有进一步的了解吗?如果还想了解更多知识或者相关内容,请关注亿速云行业资讯频道,感谢大家的支持。
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。