温馨提示×

温馨提示×

您好,登录后才能下订单哦!

密码登录×
登录注册×
其他方式登录
点击 登录注册 即表示同意《亿速云用户服务条款》

Go怎么批量操作excel导入到mongodb中

发布时间:2022-03-24 14:04:56 来源:亿速云 阅读:120 作者:iii 栏目:开发技术

本文小编为大家详细介绍“Go怎么批量操作excel导入到mongodb中”,内容详细,步骤清晰,细节处理妥当,希望这篇“Go怎么批量操作excel导入到mongodb中”文章能帮助大家解决疑惑,下面跟着小编的思路慢慢深入,一起来学习新知识吧。

    需求:完成一个命令工具,批量处理某个目录下面的一些excel,将这些excel数据导入到mongodb,同时可以同步到mysql

    代码目录:

    ├─cmd
    |  └─ecc.go     # 命令
    ├─configs
    ├─data
    ├─internal
    │  └─importing  # 主要逻辑处理
    ├─pkg           # 处理文件读取、连接数据库等
    │  ├─files
    │  ├─mongo
    │  └─mysql
    ├─queue
    └─tools

    1. 选择命令行包

    平常使用的的命令工具包有:

    • urfave/cli

    • spf13/cobra

    这里使用的是urfave/cli包,比较简单

    var DirPath = "../data"     // 默认位置
    var dir = DirPath
    app := &cli.App{
    		Name:  "Ecc",
    		Usage: "Ecc is a tools for batch processing of excel data",
    		Flags: []cli.Flag{
    			&cli.StringFlag{
    				Name:        "model",
    				Aliases:     []string{"m"},
    				Usage:       "The model of searching",
    				Value:       "model",
    				Destination: &model,
    			},
    			&cli.StringFlag{    // 设置一个 -d 的参数,用来确定目标文件夹位置
    				Name:        "dir",
    				Aliases:     []string{"d"},
    				Usage:       "Folder location of data files",
    				Destination: &dir,
    				Value:       DirPath,
    		},
    		Action: func(c *cli.Context) error {
    			importing.Load("../configs/cfg.yaml")  // 引入配置文件,读取mongodb、mysql等配置
    			importing.Handle(dir)  ## 具体逻辑处理
    			return nil
    	}

    2. 读取配置,连接数据库

    读取配置使用spf13/viper库,需要读取一下配置,连接mongodb

    var C Config
    
    type Config struct {
    	Env   string `yaml:"env"`
    	Mongo struct {
    		DNS        string `yaml:"dns"`
    		Db         string `yaml:"db"`
    		Collection string `yaml:"collection"`
    	} `yaml:"mongo"`
    	Mysql struct {
    		Alias string `yaml:"alias"`
    		Dns   string `yaml:"dns"`
    	} `yaml:"mysql"`
    }
    func Load(cf string) {
    	var err error
    	viper.SetConfigFile(cf)
    	if err = viper.ReadInConfig(); err != nil {
    		log.Fatal(fmt.Errorf("fatal error config file: %s \n", err))
    	}
    	if err = viper.Unmarshal(&configs.C); err != nil {
    		log.Fatal(fmt.Errorf("unmarshal conf failed, err:%s \n", err))
    	if err = mongo.Conn(configs.C.Mongo.DNS, configs.C.Mongo.Db); err != nil {
    		log.Fatal(color.RedString("%s:\n%v", "mongo connect err", err))
    	if mongo.CheckCollection(configs.C.Mongo.Collection) {
    		if err = mongo.DelCollection(configs.C.Mongo.Collection); err != nil {
    			log.Fatal(color.RedString("%s:\n%v", "mongo del collection err", err))
    		}
    	if err = mongo.CreateCollection(configs.C.Mongo.Collection); err != nil {
    		log.Fatal(color.RedString("%s:\n%v", "mongo create collection err", err))

    3. 读取文件

    先确定文件权限以及文件是否存在

    func ReadDir(dir string) ([]os.FileInfo, error) {
    	perm := checkPermission(dir)
    	if perm == true {
    		return nil, fmt.Errorf("permission denied dir: %s", dir)
    	}
    
    	if isNotExistDir(dir) {
    		return nil, fmt.Errorf("does not exist dir: %s", dir)
    	files, err := ioutil.ReadDir(dir)
    	if err == nil {
    		return files, err
    	return nil, fmt.Errorf("ReadDir: %s, err: %v", dir, err)
    }

    拿到文件后就要并发读取每个excel文件数据

    这里需求是一次任务必须读完所有的文件,任何一个文件有错误就退出程序。

    :: 所以需要定义异常退出信道和一个完成读取两个信道,总的数据使用sync.Map安全并发写入。

    3.1. 并发读

    rWait   = true
    rDone   = make(chan struct{})
    rCrash  = make(chan struct{})
    
    read(f, dir, data)
    for rWait {  		// 使用for循环来阻塞读文件
    	select {
    	case <-rCrash:
    		abort("-> Failure")
    		return
    	case <-rDone:
    		rWait = false
    	}
    }
    func read(fs []os.FileInfo, dir string, data *sync.Map) {
    	for _, file := range fs {
    		fileName := file.Name()
    		_ext := filepath.Ext(fileName)
    		if Include(strings.Split(Exts, ","), _ext) {
    			wg.Add(1)
    			inCh := make(chan File)
    			go func() {
    				defer wg.Done()
    				select {
    				case <-rCrash:
    					return // 退出goroutine
    				case f := <-inCh:
    					e, preData := ReadExcel(f.FilePath, f.FileName, pb)
    					if e != nil {
    						tools.Red("%v", e)
    						// 使用sync.once防止多个goroutine关闭同一个信道
    						once.Do(func() { 
    							close(rCrash)
    						})
    						return
    					}
    					data.Store(f.FileName, preData)
    				}
    			}()
    				inCh <- File{
    					FileName: fileName,
    					FilePath: dir + string(os.PathSeparator) + fileName,
    		}
    	go func() {
    		wg.Wait()
    		close(rDone)
    	}()

    3.2. 使用excelize处理excel

    excelize是一个非常好用的excel处理库,这里使用这个库读取excel文件内容

    type ExcelPre struct {
    	FileName    string
    	Data        [][]string
    	Fields      []string
    	Prefixes    string
    	ProgressBar *mpb.Bar  // 进度条对象
    }
    
    func ReadExcel(filePath, fileName string, pb *mpb.Progress) (err error, pre *ExcelPre) {
    	f, err := excelize.OpenFile(filePath)
    	if err != nil {
    		return err, nil
    	}
    	defer func() {
    		if _e := f.Close(); _e != nil {
    			fmt.Printf("%s: %v.\n\n", filePath, _e)
    		}
    	}()
    	// 获取第一页数据
    	firstSheet := f.WorkBook.Sheets.Sheet[0].Name
    	rows, err := f.GetRows(firstSheet)
    	lRows := len(rows)
    	if lRows < 2 {
    		lRows = 2
    	rb := ReadBar(lRows, filePath, pb)
    	wb := WriteBar(lRows-2, filePath, rb, pb)
    	var fields []string
    	var data [][]string
            // 进度条增加一格
    	InCr := func(start time.Time) {
    		rb.Increment()
    		rb.DecoratorEwmaUpdate(time.Since(start))
    	for i := 0; i < lRows; i++ {
    		InCr(time.Now())
    		// 这里对第一行处理,用来判断一些约定的条件
    		if i == 0 {
    			fields = rows[i]
    			for index, field := range fields {
    				if isChinese := regexp.MustCompile("[\u4e00-\u9fa5]"); isChinese.MatchString(field) || field == "" {
    					err = errors.New(fmt.Sprintf("%s: line 【A%d】 field 【%s】 \n", filePath, index, field) + "The first line of the file is not a valid attribute name.")
    					return err, nil
    				}
    			}
    			continue
    		// 过滤第二行,这一行通常是中文解释字段
    		if i == 1 {
    		data = append(data, rows[i])
    	return nil, &ExcelPre{
    		FileName:    fileName,
    		Data:        data,
    		Fields:      fields,
    		Prefixes:    Prefix(fileName),
    		ProgressBar: wb,

    3.3. 使用mpb在命令行输出进度显示

    mpb是一个很好用的命令行进度输出库,上面代码里里有两个进度条,一个是读进度条,第二个是写进度条,读进度条在文件读取的时候就显示了,返回的结构体里有写进度条对象,便于后面写操作时候显示。

    下面是两个进度条显示的配置,具体参数可以看这个库的文档。

    func ReadBar(total int, name string, pb *mpb.Progress) *mpb.Bar {
    	return pb.AddBar(int64(total),
    		mpb.PrependDecorators(
    			decor.OnComplete(decor.Name(color.YellowString("reading"), decor.WCSyncSpaceR), color.YellowString("waiting")),
    			decor.CountersNoUnit("%d / %d", decor.WCSyncWidth, decor.WCSyncSpaceR),
    		),
    		mpb.AppendDecorators(
    			decor.NewPercentage("%.2f:", decor.WCSyncSpaceR),
    			decor.EwmaETA(decor.ET_STYLE_MMSS, 0, decor.WCSyncWidth),
    			decor.Name(": "+name),
    	)
    }
    
    func WriteBar(total int, name string, beforeBar *mpb.Bar, pb *mpb.Progress) *mpb.Bar {
    		mpb.BarQueueAfter(beforeBar, false),
    		mpb.BarFillerClearOnComplete(),
    			decor.OnComplete(decor.Name(color.YellowString("writing"), decor.WCSyncSpaceR), color.GreenString("done")),
    			decor.OnComplete(decor.CountersNoUnit("%d / %d", decor.WCSyncSpaceR), ""),
    			decor.OnComplete(decor.NewPercentage("%.2f:", decor.WCSyncSpaceR), ""),
    			decor.OnComplete(decor.EwmaETA(decor.ET_STYLE_MMSS, 0, decor.WCSyncWidth), ""),
    			decor.OnComplete(decor.Name(": "+name), name),

    4. 写入mongodb

    同写入操作,这里拿到所有数据,然后使用goroutine并发写入mongodb,在处理数据时候需要查重,还需要记录一下本次操作插入了哪些数据的_id值,在报错的时候进行删除(这里可以使用事务,直接删除简单些),所以定义了一个Shuttle结构体用来在记录并发时的数据。

    wWait   = true
    wDone   = make(chan struct{})
    wCrash  = make(chan struct{})
    
    type Shuttle struct {
    	Hid []string  	// 用来判断是否是重复数据
    	Mid []string  	// 用来记录本次插入的数据_id
    	mu  sync.Mutex
    }
    func (s *Shuttle) Append(t string, str string) {
    	s.mu.Lock()
    	defer s.mu.Unlock()
    	switch t {
    	case "h":
    		s.Hid = append(s.Hid, str)
    	case "m":
    		s.Mid = append(s.Mid, str)
    	}
    write2mongo(data)
    for wWait {
    	select {
    	case <-wCrash:
    		abort("-> Failure")
    		return
    	case <-wDone:
    		wWait = false
    func write2mongo(data *sync.Map) {
    	collection := mongo.GetCollection(configs.C.Mongo.Collection)
    	data.Range(func(key, value interface{}) bool {
    		if v, ok := value.(*ExcelPre); ok {
    			wg.Add(1)
    			inCh := make(chan []bson.M)
    			go func() {
    				defer wg.Done()
    				select {
    				case <-wCrash:
    					return // exit
    				case rows := <-inCh:
    					e := Write2Mongo(rows, collection, v, &shuttle)
    					if e != nil {
    						tools.Red("%v", e)
    						once.Do(func() {
    							close(wCrash)
    						})
    						return
    					}
    				}
    			}()
    				inCh <- PreWrite(v)
    		}
    		return true
    	})
    	go func() {
    		wg.Wait()
    		close(wDone)
    	}()
    // 具体处理逻辑
    func Write2Mongo(rows []bson.M, collection *mongoDb.Collection, v *ExcelPre, s *Shuttle) error {
    	v.ProgressBar.SetCurrent(0)
    	incr := func(t time.Time, b *mpb.Bar, n int64) {
    		b.IncrInt64(n)
    		b.DecoratorEwmaUpdate(time.Since(t))
    	for _, row := range rows {
    		start := time.Now()
    		key := v.Prefixes + "@@" + row["_hid"].(string)
    		s.mu.Lock()
    		if Include(s.Hid, key) {
    			s.mu.Unlock()
    			incr(start, v.ProgressBar, 1)
    			continue
    		} else {
    			s.Hid = append(s.Hid, key)
    		var err error
    		var id primitive.ObjectID
    		if id, err = mongo.CreateDocs(collection, row); err != nil {
    			return errors.New(fmt.Sprintf("%s:\n%v", "mongo create docs err", err))
    		s.Append("m", id.Hex())
    		incr(start, v.ProgressBar, 1)
    	return nil

    5. 同步mysql

    因为同步mysql不是必要的,这里使用命令行输入进行判断:

    tools.Yellow("-> Whether to sync data to mysql? (y/n)")
    if !tools.Scan("aborted") {
    	return
    } else {
    	tools.Yellow("-> Syncing data to mysql...")
    	if err = write2mysql(); err != nil {
    		tools.Red("-> Failure:" + err.Error())
    	} else {
    		tools.Green("-> Success.")
    	}
    }

    连接mysql数据库,拿到当前monogodb的数据:

    func write2mysql() error {
    	if err := mysql.Conn(configs.C.Mysql.Dns); err != nil {
    		return err
    	}
    
    	d, err := mongo.GetCollectionAllData(configs.C.Mongo.Collection)
    	if err != nil {
    	err = Write2Mysql(d)
    	return err
    }

    创建表,直接拼sql就完事了:

    func CreateTable(tableName string, fields []string) error {
    	var err error
    	delSql := fmt.Sprintf("DROP TABLE IF EXISTS `%s`", tableName)
    	err = Db.Exec(delSql).Error
    	if err != nil {
    		return err
    	}
    
    	s := "id bigint(20) NOT NULL PRIMARY KEY"
    	for _, field := range fields {
    		s += fmt.Sprintf(",%s varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL", field)
    	sql := fmt.Sprintf("CREATE TABLE `%s` (%s) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci", tableName, s)
    	err = Db.Exec(sql).Error
    	return nil
    }

    插入数据,bson.M本身就是一个map,转一下使用gorm分批插入数据,速度快一点:

    func InsertData(tableName string, fields []string, data []bson.M) error {
    	var err error
    	var maps []map[string]interface{}
    	for _, d := range data {
    		row := make(map[string]interface{})
    		for _, field := range fields {
    			row[field] = d[field]
    		}
    		if row != nil {
    			row["id"] = d["id"].(string)
    			maps = append(maps, row)
    	}
    
    	if len(maps) > 0 {
    		err = Db.Table(tableName).CreateInBatches(maps, 100).Error
    		if err != nil {
    			return err
    	return err
    }

    读到这里,这篇“Go怎么批量操作excel导入到mongodb中”文章已经介绍完毕,想要掌握这篇文章的知识点还需要大家自己动手实践使用过才能领会,如果想了解更多相关内容的文章,欢迎关注亿速云行业资讯频道。

    向AI问一下细节

    免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。

    AI