Untitled

mail@pastecode.io avatar
unknown
plain_text
a year ago
1.9 kB
2
Indexable
Never

func NewReadCorporateAccountsDataActivity(
	logger log.Logger,
	httpFactory client.HTTPClientFactory,
	clientAuth client_middleware.AuthIn,
	storageFactory storage.StorageFactory,
	pathService storage.PathService,
) (ReadCorporateAccountsDataActivity, error) {
	s3Storage, err := storageFactory.CreateByType(storage.StorageTypeS3)
	fsStorage, err := storageFactory.CreateByType(storage.StorageTypeFile)
	objStorage := storage.NewObjectStorage(fsStorage, s3Storage, pathService, logger)

	return func(ctx context.Context, req *pbv1.ReadCorporateAccountsDataActivityRequest) (*pbv1.CorporateAccountInfoList, error) {
		_, err := os.Stat(req.Sha256 + ".csv")
		if err != nil {
			logger.Error(err)
			if !errors.Is(err, os.ErrNotExist) {
				return nil, err
			}

			stream, err := objStorage.Open(ctx, req.Sha256)
			if err != nil {
				logger.Error(err)
				return nil, err
			}
			defer stream.Close()

			err = fsStorage.Store(ctx, storage.NewPath(req.Sha256+".csv"), stream)
		}

		file, err := fsStorage.Open(ctx, storage.NewPath(req.Sha256+".csv"))
		if err != nil {
			logger.Error(err)
			return nil, err
		}
		defer file.Close()

		begin := req.Paginator.Offset
		end := begin + req.Paginator.Limit

		var items []*pbv1.CorporateAccountInfo

		reader := csv.NewReader(file)
		for i := 1; i <= int(end); i++ {
			record, err := reader.Read()
			if err == io.EOF {
				break
			}

			if err != nil {
				logger.Error(err)
				return nil, err
			}

			if i < int(begin) {
				continue
			}

			if len(record) != 1 {
				err = fmt.Errorf("invalid amount of columns in row %d", i)
				logger.Error(err)
				return nil, err
			}

			items = append(items, &pbv1.CorporateAccountInfo{
				ExternalId: record[0],
			})
		}

		logger.Debugf("read %d rows from %s (limit %d offset %d)", len(items), req.Url, req.Paginator.Limit, req.Paginator.Offset)

		return &pbv1.CorporateAccountInfoList{Items: items}, nil
	}, nil
}